Skip to content

Commit

Permalink
feat: gtfs and gbfs validation reports bq data ingestion (#702)
Browse files Browse the repository at this point in the history
  • Loading branch information
cka-y authored Aug 22, 2024
1 parent 406304f commit e1823aa
Show file tree
Hide file tree
Showing 36 changed files with 1,986 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ __pycache__

# Test coverage results
.coverage
coverage_reports
coverage_reports
tf.plan
10 changes: 10 additions & 0 deletions functions-python/big_query_ingestion/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[run]
omit =
*/test*/*
*/helpers/*
*/database_gen/*
*/dataset_service/*

[report]
exclude_lines =
if __name__ == .__main__.:
40 changes: 40 additions & 0 deletions functions-python/big_query_ingestion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# GTFS and GBFS Data Transfer Cloud Functions

This directory includes two HTTP-triggered Google Cloud Functions designed to transfer GTFS and GBFS data into Google BigQuery.

## Overview

These Cloud Functions automate the transfer of GTFS and GBFS data into BigQuery by:

1. **Retrieving**: Collecting all NDJSON files from a Google Cloud Storage bucket.
2. **Loading**: Ingesting the NDJSON data into a new BigQuery table. The table name includes a date string suffix, ensuring that data is grouped by date (with GTFS and GBFS data handled separately).
3. **Cleaning Up**: Deleting the processed NDJSON files from the bucket to prevent reprocessing.

### Cloud Functions

- **`ingest_data_to_big_query_gtfs`**: Handles the transfer of GTFS data to BigQuery.
- **`ingest_data_to_big_query_gbfs`**: Handles the transfer of GBFS data to BigQuery.

Both functions are triggered via HTTP and can be invoked manually or automatically by a Cloud Scheduler on a predefined schedule.

## Project Structure

- **`main.py`**: Defines the HTTP-triggered Cloud Functions that initiate the GTFS and GBFS data transfer processes.
- **`gbfs_big_query_ingest.py`**: Contains the logic for retrieving NDJSON files, loading GBFS data into BigQuery, and deleting the processed files.
- **`gtfs_big_query_ingest.py`**: Contains the logic for retrieving NDJSON files, loading GTFS data into BigQuery, and deleting the processed files.
- **`common/`**: Shared utilities and helper functions for interacting with Google Cloud Storage and BigQuery.
- **`tests/`**: Unit tests for all modules and functions, ensuring correct functionality and robustness.

## Function Configuration

The following environment variables are required for the functions to operate:

- **`PROJECT_ID`**: Google Cloud project ID where the BigQuery dataset and table reside.
- **`BUCKET_NAME`**: Name of the Google Cloud Storage bucket where the NDJSON files are stored.
- **`DATASET_ID`**: BigQuery dataset ID where the NDJSON data will be loaded.
- **`TABLE_ID`**: Prefix for the BigQuery table ID. The actual table name will include a date string suffix.
- **`BQ_DATASET_LOCATION`**: Location of the BigQuery dataset.

## Local Development

For local development, follow the same steps as for other functions in the project. Please refer to the [README.md](../README.md) file in the parent directory for detailed instructions.
20 changes: 20 additions & 0 deletions functions-python/big_query_ingestion/function_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "ingest-data-to-big-query",
"description": "Ingest data to BigQuery",
"entry_point": "ingest_data_to_big_query",
"timeout": 540,
"memory": "8Gi",
"trigger_http": false,
"include_folders": ["database_gen", "helpers"],
"environment_variables": [],
"secret_environment_variables": [
{
"key": "FEEDS_DATABASE_URL"
}
],
"ingress_settings": "ALLOW_INTERNAL_AND_GCLB",
"max_instance_request_concurrency": 1,
"max_instance_count": 1,
"min_instance_count": 0,
"available_cpu": 2
}
15 changes: 15 additions & 0 deletions functions-python/big_query_ingestion/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
functions-framework==3.*
google-cloud-logging
google-cloud-bigquery
google-cloud-storage
psycopg2-binary==2.9.6
aiohttp~=3.8.6
asyncio~=3.4.3
urllib3~=2.1.0
SQLAlchemy==2.0.23
geoalchemy2==0.14.7
requests~=2.31.0
attrs~=23.1.0
pluggy~=1.3.0
certifi~=2023.7.22
pandas
4 changes: 4 additions & 0 deletions functions-python/big_query_ingestion/requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Faker
pytest~=7.4.3
urllib3-mock
requests-mock
Empty file.
110 changes: 110 additions & 0 deletions functions-python/big_query_ingestion/src/common/bq_data_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import logging
import os
from datetime import datetime

from google.cloud import bigquery, storage
from google.cloud.bigquery.job import LoadJobConfig, SourceFormat

from helpers.bq_schema.schema import json_schema_to_bigquery, load_json_schema

# Environment variables
project_id = os.getenv("PROJECT_ID")
bucket_name = os.getenv("BUCKET_NAME")
dataset_id = os.getenv("DATASET_ID")
table_id = f"{os.getenv('TABLE_ID')}_{datetime.now().strftime('%Y%m%d')}"
dataset_location = os.getenv("BQ_DATASET_LOCATION")


class BigQueryDataTransfer:
"""Base class for BigQuery data transfer."""

def __init__(self):
self.bigquery_client = bigquery.Client(project=project_id)
self.storage_client = storage.Client(project=project_id)
self.schema_path = None
self.nd_json_path_prefix = "ndjson"

def create_bigquery_dataset(self):
"""Creates a BigQuery dataset if it does not exist."""
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
try:
self.bigquery_client.get_dataset(dataset_ref)
logging.info(f"Dataset {dataset_id} already exists.")
except Exception:
dataset = bigquery.Dataset(dataset_ref)
dataset.location = dataset_location
self.bigquery_client.create_dataset(dataset)
logging.info(f"Created dataset {dataset_id}.")

def create_bigquery_table(self):
"""Creates a BigQuery table if it does not exist."""
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_id)

try:
self.bigquery_client.get_table(table_ref)
logging.info(f"Table {table_id} already exists.")
except Exception:
if self.schema_path is None:
raise Exception("Schema path is not provided")
json_schema = load_json_schema(self.schema_path)
schema = json_schema_to_bigquery(json_schema)

table = bigquery.Table(table_ref, schema=schema)
table = self.bigquery_client.create_table(table)
logging.info(
f"Created table {table.project}.{table.dataset_id}.{table.table_id}"
)

def load_data_to_bigquery(self):
"""Loads data from Cloud Storage to BigQuery."""
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_id)
source_uris = []
# Get the list of blobs in the bucket
blobs = list(
self.storage_client.list_blobs(bucket_name, prefix=self.nd_json_path_prefix)
)
for blob in blobs:
uri = f"gs://{bucket_name}/{blob.name}"
source_uris.append(uri)
logging.info(f"Found {len(source_uris)} files to load to BigQuery.")

if len(source_uris) > 0:
# Load the data to BigQuery
job_config = LoadJobConfig()
job_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

load_job = self.bigquery_client.load_table_from_uri(
source_uris, table_ref, job_config=job_config
)
try:
load_job.result() # Wait for the job to complete
logging.info(
f"Loaded {len(source_uris)} files into "
f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
)
# If successful, delete the blobs
for blob in blobs:
blob.delete()
logging.info(f"Deleted blob: {blob.name}")
except Exception as e:
logging.error(f"An error occurred while loading data to BigQuery: {e}")
for error in load_job.errors:
logging.error(f"Error: {error['message']}")
if "location" in error:
logging.error(f"Location: {error['location']}")
if "reason" in error:
logging.error(f"Reason: {error['reason']}")

def send_data_to_bigquery(self):
"""Full process to send data to BigQuery."""
try:
self.create_bigquery_dataset()
self.create_bigquery_table()
self.load_data_to_bigquery()
return "Data successfully loaded to BigQuery", 200
except Exception as e:
logging.error(f"An error occurred: {e}")
return f"Error while loading data: {e}", 500
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os

from ..common.bq_data_transfer import BigQueryDataTransfer


class BigQueryDataTransferGBFS(BigQueryDataTransfer):
"""BigQuery data transfer for GBFS data"""

def __init__(self):
super().__init__()
current_dir = os.path.dirname(os.path.abspath(__file__))
self.schema_path = os.path.join(
current_dir, "../helpers/bq_schema/gbfs_schema.json"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os

from ..common.bq_data_transfer import BigQueryDataTransfer


class BigQueryDataTransferGTFS(BigQueryDataTransfer):
"""BigQuery data transfer for GTFS data"""

def __init__(self):
super().__init__()
current_dir = os.path.dirname(os.path.abspath(__file__))
self.schema_path = os.path.join(
current_dir, "../helpers/bq_schema/gtfs_schema.json"
)
25 changes: 25 additions & 0 deletions functions-python/big_query_ingestion/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging

import functions_framework

from helpers.logger import Logger
from .gbfs.gbfs_big_query_ingest import BigQueryDataTransferGBFS
from .gtfs.gtfs_big_query_ingest import BigQueryDataTransferGTFS

logging.basicConfig(level=logging.INFO)


@functions_framework.http
def ingest_data_to_big_query_gtfs(_):
"""Google Storage to Big Query data ingestion for GTFS data"""
Logger.init_logger()
logging.info("Function triggered")
return BigQueryDataTransferGTFS().send_data_to_bigquery()


@functions_framework.http
def ingest_data_to_big_query_gbfs(_):
"""Google Storage to Big Query data ingestion for GBFS data"""
Logger.init_logger()
logging.info("Function triggered")
return BigQueryDataTransferGBFS().send_data_to_bigquery()
127 changes: 127 additions & 0 deletions functions-python/big_query_ingestion/tests/test_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import unittest
from unittest.mock import patch, MagicMock

from google.cloud import bigquery

from big_query_ingestion.src.common.bq_data_transfer import BigQueryDataTransfer


class TestBigQueryDataTransfer(unittest.TestCase):
@patch("google.cloud.storage.Client")
@patch("big_query_ingestion.src.common.bq_data_transfer.bigquery.Client")
def setUp(self, mock_bq_client, mock_storage_client):
self.transfer = BigQueryDataTransfer()
self.transfer.schema_path = "fake_schema_path.json"
self.mock_bq_client = mock_bq_client
self.mock_storage_client = mock_storage_client

@patch("big_query_ingestion.src.common.bq_data_transfer.bigquery.DatasetReference")
def test_create_bigquery_dataset_exists(self, _):
self.mock_bq_client().get_dataset.return_value = True
self.transfer.create_bigquery_dataset()

self.mock_bq_client().get_dataset.assert_called_once()
self.mock_bq_client().create_dataset.assert_not_called()

@patch("big_query_ingestion.src.common.bq_data_transfer.bigquery.DatasetReference")
def test_create_bigquery_dataset_not_exists(self, _):
self.mock_bq_client().get_dataset.side_effect = Exception("Dataset not found")

self.transfer.create_bigquery_dataset()

self.mock_bq_client().get_dataset.assert_called_once()
self.mock_bq_client().create_dataset.assert_called_once()

@patch("big_query_ingestion.src.common.bq_data_transfer.load_json_schema")
@patch("big_query_ingestion.src.common.bq_data_transfer.json_schema_to_bigquery")
@patch("big_query_ingestion.src.common.bq_data_transfer.bigquery.DatasetReference")
def test_create_bigquery_table_not_exists(
self, _, mock_json_schema_to_bigquery, mock_load_json_schema
):
self.mock_bq_client().get_table.side_effect = Exception("Table not found")
mock_load_json_schema.return_value = {
"fields": [{"name": "field1", "type": "STRING"}]
}
mock_json_schema_to_bigquery.return_value = [
bigquery.SchemaField("field1", "STRING", mode="NULLABLE")
]

self.transfer.create_bigquery_table()

self.mock_bq_client().get_table.assert_called_once()
mock_load_json_schema.assert_called_once_with(self.transfer.schema_path)
mock_json_schema_to_bigquery.assert_called_once()
self.mock_bq_client().create_table.assert_called_once()

@patch("big_query_ingestion.src.common.bq_data_transfer.bigquery.DatasetReference")
def test_create_bigquery_table_exists(self, _):
self.mock_bq_client().get_table.return_value = True

self.transfer.create_bigquery_table()

self.mock_bq_client().get_table.assert_called_once()
self.mock_bq_client().create_table.assert_not_called()

@patch("big_query_ingestion.src.common.bq_data_transfer.bigquery.DatasetReference")
def test_load_data_to_bigquery(self, _):
mock_blob = MagicMock()
mock_blob.name = "file1.ndjson"
self.mock_storage_client().list_blobs.return_value = [mock_blob]

mock_load_job = MagicMock()
self.mock_bq_client().load_table_from_uri.return_value = mock_load_job

self.transfer.load_data_to_bigquery()

self.mock_storage_client().list_blobs.assert_called_once()
self.mock_bq_client().load_table_from_uri.assert_called_once()
mock_load_job.result.assert_called_once()

@patch("big_query_ingestion.src.common.bq_data_transfer.bigquery.DatasetReference")
def test_load_data_to_bigquery_error(self, _):
mock_blob = MagicMock()
mock_blob.name = "file1.ndjson"
self.mock_storage_client().list_blobs.return_value = [mock_blob]

mock_load_job = MagicMock()
mock_load_job.result.side_effect = Exception("Load job failed")
self.mock_bq_client().load_table_from_uri.return_value = mock_load_job

with self.assertLogs(level="ERROR") as log:
self.transfer.load_data_to_bigquery()

self.assertIn(
"An error occurred while loading data to BigQuery: Load job failed",
log.output[0],
)

@patch(
"big_query_ingestion.src.common.bq_data_transfer.BigQueryDataTransfer.create_bigquery_dataset"
)
@patch(
"big_query_ingestion.src.common.bq_data_transfer.BigQueryDataTransfer.create_bigquery_table"
)
@patch(
"big_query_ingestion.src.common.bq_data_transfer.BigQueryDataTransfer.load_data_to_bigquery"
)
def test_send_data_to_bigquery_success(
self, mock_load_data, mock_create_table, mock_create_dataset
):
response, status = self.transfer.send_data_to_bigquery()

mock_create_dataset.assert_called_once()
mock_create_table.assert_called_once()
mock_load_data.assert_called_once()
self.assertEqual(status, 200)
self.assertEqual(response, "Data successfully loaded to BigQuery")

@patch(
"big_query_ingestion.src.common.bq_data_transfer.BigQueryDataTransfer.create_bigquery_dataset",
side_effect=Exception("Dataset creation failed"),
)
def test_send_data_to_bigquery_failure(self, mock_create_dataset):
response, status = self.transfer.send_data_to_bigquery()

mock_create_dataset.assert_called_once()
self.assertEqual(status, 500)
self.assertIn("Error while loading data: Dataset creation failed", response)
Loading

0 comments on commit e1823aa

Please sign in to comment.