Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery scripts to import/export GeoParquet files #113

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,35 @@ virtualenv:
```
poetry run pip install -U --force-reinstall pygeos --no-binary pygeos
```

### BigQuery

Convert a SQL query to parquet:

```bash
poetry run python bigquery_to_parquet.py \
--input-query "SELECT * FROM carto-do-public-data.carto.geography_usa_blockgroup_2019" \
--primary-column geom \
--partition-size 100 \
--output geography_usa_blockgroup_2019
```

Upload a parquet file or folder to BigQuery:
```bash
poetry run python parquet_to_bigquery.py \
--input geography_usa_blockgroup_2019 \
--output "cartodb-gcp-backend-data-team.alasarr.geography_usa_blockgroup_2019"
```

Instead of using folders, you can also work with a single file, but it might hit [bigquery limits](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet) when you upload it to BigQuery. For large parquet files you might get `UDF out of memory` errors.

Convert a SQL query to single parquet file:

```bash
poetry run python bigquery_to_parquet.py \
--input-query "SELECT * FROM carto-do-public-data.carto.geography_usa_blockgroup_2019" \
--primary-column geom \
--partition-size 100 \
--mode file \
--output geography_usa_blockgroup_2019.parquet
```
90 changes: 90 additions & 0 deletions scripts/bigquery_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@


import click
import sys
import pyarrow.parquet as pq
import geopandas as gpd

from encoder import AVAILABLE_COMPRESSIONS, Edges, PathType, geopandas_to_arrow
from pathlib import Path
from google.cloud import bigquery

MODES = ["FILE", "FOLDER"]

def read_gdf(input_query: str, primary_column: str):
client = bigquery.Client()
df = client.query(input_query).to_dataframe()
df[primary_column] = gpd.GeoSeries.from_wkt(df[primary_column])
return gpd.GeoDataFrame(df, geometry=primary_column, crs="EPSG:4326")

@click.command()
@click.option(
"-q",
"--input-query",
type=str,
help="SQL query of the data to export",
required=True,
)
@click.option(
"--primary-column",
type=str,
help="The primary column name with geometry data",
required=True,
)
@click.option(
"-o",
"--output",
type=PathType(file_okay=True, dir_okay=True, writable=True),
help="Path to output",
required=True,
)
@click.option(
"-m",
"--mode",
type=click.Choice(MODES, case_sensitive=False),
help="Mode to use FILE or FOLDER",
default="FOLDER",
show_default=True
)
@click.option(
"--compression",
type=click.Choice(AVAILABLE_COMPRESSIONS, case_sensitive=False),
default="SNAPPY",
help="Compression codec to use when writing to Parquet.",
show_default=True,
)
@click.option(
"--partition-size",
type=int,
default=5000,
help="Number of records per partition. Ignored if --single-file is provided.",
show_default=True,
)
def main(input_query: str, primary_column: str, output: Path, mode: str, compression: str , partition_size: int):
print("Reading data from BigQuery", file=sys.stderr)

if mode.upper() == 'FOLDER':
gdf = (
read_gdf(input_query, primary_column)
.assign(__partition__= lambda x: x.index // partition_size)
)
else:
gdf = read_gdf(input_query, primary_column)

print("Finished reading", file=sys.stderr)
print("Starting conversion to Arrow", file=sys.stderr)
arrow_table = geopandas_to_arrow(gdf, Edges.SPHERICAL)
print("Finished conversion to Arrow", file=sys.stderr)

print("Starting write to Parquet", file=sys.stderr)

if mode.upper() == 'FOLDER':
# We need to export to multiple files, because a single file might hit bigquery limits (UDF out of memory). https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet
pq.write_to_dataset(arrow_table, root_path=output, partition_cols=['__partition__'], compression=compression)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still have a discussion on best practices for partitioned datasets. #79 Specifically around whether the _metadata file is required/suggested/etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will follow the conversation there

else:
pq.write_table(arrow_table, output, compression=compression)

print("Finished write to Parquet", file=sys.stderr)

if __name__ == "__main__":
main()
144 changes: 144 additions & 0 deletions scripts/encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import json
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List

import click
import geopandas as gpd
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pygeos
from numpy.typing import NDArray

GEOPARQUET_VERSION = "0.4.0"
AVAILABLE_COMPRESSIONS = ["NONE", "SNAPPY", "GZIP", "BROTLI", "LZ4", "ZSTD"]

PygeosGeometryArray = NDArray[pygeos.Geometry]

class Edges(Enum):
PLANAR = 'planar'
SPHERICAL = 'spherical'

class PathType(click.Path):
"""A Click path argument that returns a pathlib Path, not a string"""

def convert(self, value, param, ctx):
return Path(super().convert(value, param, ctx))

class GeometryType(int, Enum):
"""Pygeos (GEOS) geometry type mapping
From https://pygeos.readthedocs.io/en/latest/geometry.html?highlight=type#pygeos.geometry.get_type_id
"""

Missing = -1
Point = 0
LineString = 1
LinearRing = 2
Polygon = 3
MultiPoint = 4
MultiLinestring = 5
MultiPolygon = 6
GeometryCollection = 7


def _parse_to_pygeos(df: gpd.GeoDataFrame) -> Dict[str, PygeosGeometryArray]:
"""Parse to pygeos geometry array

This is split out from _create_metadata so that we don't have to create the pygeos
array twice: once for converting to wkb and another time for metadata handling.
"""
geometry_columns: Dict[str, PygeosGeometryArray] = {}
for col in df.columns[df.dtypes == "geometry"]:
geometry_columns[col] = df[col].array.data

return geometry_columns


def _create_metadata(
df: gpd.GeoDataFrame, geometry_columns: Dict[str, PygeosGeometryArray], edges: Edges
) -> Dict[str, Any]:
"""Create and encode geo metadata dict.

Parameters
----------
df : GeoDataFrame

Returns
-------
dict
"""

# Construct metadata for each geometry
column_metadata = {}
for col, geometry_array in geometry_columns.items():
geometry_type = _get_geometry_type(geometry_array)
bbox = list(pygeos.total_bounds(geometry_array))

series = df[col]
column_metadata[col] = {
"encoding": "WKB",
"geometry_type": geometry_type,
"crs": series.crs.to_json_dict() if series.crs else None,
# We don't specify orientation for now
# "orientation"
"edges": edges.value,
"bbox": bbox,
# I don't know how to get the epoch from a pyproj CRS, and if it's relevant
# here
# "epoch":
}

return {
"version": GEOPARQUET_VERSION,
"primary_column": df._geometry_column_name,
"columns": column_metadata,
# "creator": {"library": "geopandas", "version": geopandas.__version__},
}


def _get_geometry_type(pygeos_geoms: PygeosGeometryArray) -> List[str]:
type_ids = pygeos.get_type_id(pygeos_geoms)
unique_type_ids = set(type_ids)

geom_type_names: List[str] = []
for type_id in unique_type_ids:
geom_type_names.append(GeometryType(type_id).name)

return geom_type_names


def _encode_metadata(metadata: Dict) -> bytes:
"""Encode metadata dict to UTF-8 JSON string

Parameters
----------
metadata : dict

Returns
-------
UTF-8 encoded JSON string
"""
# Remove unnecessary whitespace in JSON metadata
# https://stackoverflow.com/a/33233406
return json.dumps(metadata, separators=(',', ':')).encode("utf-8")


def geopandas_to_arrow(df: gpd.GeoDataFrame, edges:Edges = Edges.PLANAR) -> pa.Table:
geometry_columns = _parse_to_pygeos(df)
geo_metadata = _create_metadata(df, geometry_columns, edges)

df = pd.DataFrame(df)
for col, geometry_array in geometry_columns.items():
df[col] = pygeos.to_wkb(geometry_array)

table = pa.Table.from_pandas(df, preserve_index=False)

metadata = table.schema.metadata
metadata.update({b"geo": _encode_metadata(geo_metadata)})
return table.replace_schema_metadata(metadata)




109 changes: 109 additions & 0 deletions scripts/parquet_to_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import click
import json
import glob
import pyarrow.parquet as pq

from encoder import PathType
from pathlib import Path
from google.cloud import bigquery
from encoder import Edges

def upload_parquet_file(client: bigquery.Client, file: Path, write_disposition: str, dst: str):
"""Upload a parquet file to BigQuery"""
job_config = bigquery.LoadJobConfig(
source_format = bigquery.SourceFormat.PARQUET,
write_disposition = write_disposition,
)

with open(file, "rb") as source_file:
print(f"Uploading file {file}")
job = client.load_table_from_file(source_file, dst, job_config=job_config)

job.result() # Waits for the job to complete.

def validate_metadata(metadata):
"""Validate metadata"""
if metadata is None or b"geo" not in metadata:
raise ValueError("Missing geo metadata")
alasarr marked this conversation as resolved.
Show resolved Hide resolved

geo = json.loads(metadata[b"geo"])

if (geo["primary_column"] not in geo["columns"]):
raise ValueError("Primary column not found")

for column_name, column_meta in geo["columns"].items():
encoding = column_meta["encoding"]
edges = column_meta["edges"]
if encoding != 'WKB':
raise ValueError(f"Not supported encoding {encoding} for column {column_name}")
if edges != Edges.SPHERICAL.value:
raise ValueError(f"Only spherical edges are supported")

@click.command()
@click.option(
"-i",
"--input",
type=PathType(exists=True, readable=True),
help="Path to a parquet file or a folder with multiple parquet files inside (it requires extension *.parquet).",
required=True,
)
@click.option(
"-o",
"--output",
type=str,
help="FQN of the destination table (project.dataset.table).",
required=True,
)
def main(input: Path, output: str):
primary_column = None
tmp_output = f"{output}_tmp"
metadata = None
client = bigquery.Client()

if input.is_dir():
# A folder is detected
first_file = True

for file in glob.glob(f"{input}/**/*.parquet",recursive=True):

if first_file:
# First file determines the schema and truncates the table
metadata = pq.read_schema(file).metadata
validate_metadata(metadata)
write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
first_file = False
else:
# other files will append
write_disposition = bigquery.WriteDisposition.WRITE_APPEND

upload_parquet_file(client, file, write_disposition, tmp_output)
else:
# Single file mode
metadata = pq.read_schema(input).metadata
validate_metadata(metadata)
write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
upload_parquet_file(client, input, write_disposition, tmp_output)

metadata_geo = json.loads(metadata[b"geo"])
primary_column = metadata_geo["primary_column"]
geo_columns = list(metadata_geo["columns"].keys())
wkb_columns_expression = map(lambda c: f"ST_GEOGFROMWKB({c}) as {c}", geo_columns)

## Convert to geography the file(s) imported
sql = f"""
DROP TABLE IF EXISTS {output};
CREATE TABLE {output} CLUSTER BY {primary_column}
AS SELECT * EXCEPT({", ".join(geo_columns)}),
{", ".join(wkb_columns_expression)}
FROM {tmp_output};
DROP TABLE IF EXISTS {tmp_output};
"""

query_job = client.query(sql)
query_job.result() # Waits for job to complete.

table = client.get_table(output)
print(f"Loaded {table.num_rows} rows and {len(table.schema)} columns to {output}")

if __name__ == "__main__":
main()
Loading