Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[82,22] S3 download/upload support for xlsx,xlsm and geojson #88

Merged
merged 11 commits into from
Nov 1, 2023
Merged
152 changes: 122 additions & 30 deletions nesta_ds_utils/loading_saving/S3.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import io
from typing import List
from xmlrpc.client import Boolean
import boto3
from fnmatch import fnmatch
import pandas as pd
import geopandas as gpd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
Expand Down Expand Up @@ -46,9 +46,34 @@ def _df_to_fileobj(df_data: pd.DataFrame, path_to: str, **kwargs) -> io.BytesIO:
df_data.to_csv(buffer, **kwargs)
elif fnmatch(path_to, "*.parquet"):
df_data.to_parquet(buffer, **kwargs)
elif fnmatch(path_to, "*.xlsx"):
df_data.to_excel(buffer, **kwargs)
elif fnmatch(path_to, "*.xlsm"):
df_data.to_excel(buffer, **kwargs)
else:
raise Exception(
"Uploading dataframe currently supported only for 'csv' and 'parquet'."
raise NotImplementedError(
"Uploading dataframe currently supported only for 'csv', 'parquet', 'xlsx' and xlsm'."
)
buffer.seek(0)
return buffer


def _gdf_to_fileobj(df_data: gpd.GeoDataFrame, path_to: str, **kwargs) -> io.BytesIO:
"""Convert GeoDataFrame into bytes file object.

Args:
df_data (gpd.DataFrame): Dataframe to convert.
path_to (str): Saving file name.

Returns:
io.BytesIO: Bytes file object.
"""
buffer = io.BytesIO()
if fnmatch(path_to, "*.geojson"):
df_data.to_file(buffer, driver="GeoJSON", **kwargs)
else:
raise NotImplementedError(
"Uploading geodataframe currently supported only for 'geojson'."
)
buffer.seek(0)
return buffer
Expand All @@ -67,8 +92,30 @@ def _dict_to_fileobj(dict_data: dict, path_to: str, **kwargs) -> io.BytesIO:
buffer = io.BytesIO()
if fnmatch(path_to, "*.json"):
buffer.write(json.dumps(dict_data, **kwargs).encode())
elif fnmatch(path_to, "*.geojson"):
if "type" in dict_data:
if dict_data["type"] in [
"Point",
"MultiPoint",
"LineString",
"MultiLineString",
"Polygon",
"MultiPolygon",
"GeometryCollection",
"Feature",
"FeatureCollection",
]:
buffer.write(json.dumps(dict_data, **kwargs).encode())
else:
raise AttributeError(
"GeoJSONS must have a member with the name 'type', the value of the member must "
"be one of the following: 'Point', 'MultiPoint', 'LineString', 'MultiLineString',"
"'Polygon', 'MultiPolygon','GeometryCollection', 'Feature' or 'FeatureCollection'."
)
else:
raise Exception("Uploading dictionary currently supported only for 'json'.")
raise NotImplementedError(
"Uploading dictionary currently supported only for 'json' and 'geojson'."
)
buffer.seek(0)
return buffer

Expand All @@ -92,7 +139,7 @@ def _list_to_fileobj(list_data: list, path_to: str, **kwargs) -> io.BytesIO:
elif fnmatch(path_to, "*.json"):
buffer.write(json.dumps(list_data, **kwargs).encode())
else:
raise Exception(
raise NotImplementedError(
"Uploading list currently supported only for 'csv', 'txt' and 'json'."
)
buffer.seek(0)
Expand All @@ -112,7 +159,9 @@ def _str_to_fileobj(str_data: str, path_to: str, **kwargs) -> io.BytesIO:
if fnmatch(path_to, "*.txt"):
buffer = io.BytesIO(bytes(str_data.encode("utf-8")))
else:
raise Exception("Uploading string currently supported only for 'txt'.")
raise NotImplementedError(
"Uploading string currently supported only for 'txt'."
)
buffer.seek(0)
return buffer

Expand All @@ -135,7 +184,7 @@ def _np_array_to_fileobj(
elif fnmatch(path_to, "*.parquet"):
pq.write_table(pa.table({"data": np_array_data}), buffer, **kwargs)
else:
raise Exception(
raise NotImplementedError(
"Uploading numpy array currently supported only for 'csv' and 'parquet."
)
buffer.seek(0)
Expand All @@ -156,7 +205,7 @@ def _unsupp_data_to_fileobj(data: any, path_to: str, **kwargs) -> io.BytesIO:
if fnmatch(path_to, "*.pkl"):
pickle.dump(data, buffer, **kwargs)
else:
raise Exception(
raise NotImplementedError(
"This file type is not supported for this data. Use 'pkl' instead."
)
buffer.seek(0)
Expand All @@ -180,7 +229,9 @@ def upload_obj(
kwargs_writing (dict, optional): Dictionary of kwargs for writing data.

"""
if isinstance(obj, pd.DataFrame):
if isinstance(obj, gpd.base.GeoPandasBase):
obj = _gdf_to_fileobj(obj, path_to, **kwargs_writing)
elif isinstance(obj, pd.DataFrame):
obj = _df_to_fileobj(obj, path_to, **kwargs_writing)
elif isinstance(obj, dict):
obj = _dict_to_fileobj(obj, path_to, **kwargs_writing)
Expand All @@ -194,7 +245,7 @@ def upload_obj(
obj = _unsupp_data_to_fileobj(obj, path_to, **kwargs_writing)
warnings.warn(
"Data uploaded as pickle. Please consider other accessible "
"file types among the suppoted ones."
"file types among the supported ones."
)

s3 = boto3.client("s3")
Expand All @@ -215,6 +266,26 @@ def _fileobj_to_df(fileobj: io.BytesIO, path_from: str, **kwargs) -> pd.DataFram
return pd.read_csv(fileobj, **kwargs)
elif fnmatch(path_from, "*.parquet"):
return pd.read_parquet(fileobj, **kwargs)
elif fnmatch(path_from, "*.xlsx"):
return pd.read_excel(fileobj, **kwargs)
elif fnmatch(path_from, "*.xlsm"):
return pd.read_excel(fileobj, **kwargs)


def _fileobj_to_gdf(fileobj: io.BytesIO, path_from: str, **kwargs) -> pd.DataFrame:
"""Convert bytes file object into geodataframe.

Args:
fileobj (io.BytesIO): Bytes file object.
path_from (str): Path of loaded data.

Returns:
gpd.DataFrame: Data as geodataframe.
"""
if fnmatch(path_from, "*.geojson"):
return gpd.GeoDataFrame.from_features(
json.loads(fileobj.getvalue().decode())["features"]
)


def _fileobj_to_dict(fileobj: io.BytesIO, path_from: str, **kwargs) -> dict:
Expand Down Expand Up @@ -294,65 +365,86 @@ def download_obj(
bucket (str): Bucket's name.
path_from (str): Path to data in S3.
download_as (str, optional): Type of object downloading. Choose between
('dataframe', 'dict', 'list', 'str', 'np.array'). Not needed for 'pkl files'.
('dataframe', 'geodf', 'dict', 'list', 'str', 'np.array'). Not needed for 'pkl files'.
kwargs_boto (dict, optional): Dictionary of kwargs for boto3 function 'download_fileobj'.
kwargs_reading (dict, optional): Dictionary of kwargs for reading data.

Returns:
any: Donwloaded data.
any: Downloaded data.
"""
if not path_from.endswith(tuple([".csv", ".parquet", ".json", ".txt", ".pkl"])):
raise Exception(
if not path_from.endswith(
tuple(
[".csv", ".parquet", ".json", ".txt", ".pkl", ".geojson", ".xlsx", ".xlsm"]
)
):
raise NotImplementedError(
"This file type is not currently supported for download in memory."
)
s3 = boto3.client("s3")
fileobj = io.BytesIO()
s3.download_fileobj(bucket, path_from, fileobj, **kwargs_boto)
fileobj.seek(0)
if download_as == "dataframe":
if path_from.endswith(tuple([".csv", ".parquet"])):
if not download_as:
if path_from.endswith(tuple([".pkl"])):
return pickle.load(fileobj, **kwargs_reading)
else:
raise ValueError("'download_as' is required for this file type.")
elif download_as == "dataframe":
if path_from.endswith(tuple([".csv", ".parquet", ".xlsx", ".xlsm"])):
return _fileobj_to_df(fileobj, path_from, **kwargs_reading)
else:
raise Exception(
raise NotImplementedError(
"Download as dataframe currently supported only "
"for 'csv' and 'parquet'."
"for 'csv','parquet','xlsx' and 'xlsm'."
)
elif download_as == "geodf":
if path_from.endswith(tuple([".geojson"])):
return _fileobj_to_gdf(fileobj, path_from, **kwargs_reading)
else:
raise NotImplementedError(
"Download as geodataframe currently supported only " "for 'geojson'."
)
elif download_as == "dict":
if path_from.endswith(tuple([".json"])):
return _fileobj_to_dict(fileobj, path_from, **kwargs_reading)
elif path_from.endswith(tuple([".geojson"])):
warnings.warn(
"Please check geojson has a member with the name 'type', the value of the member must be one of the following:"
"'Point', 'MultiPoint', 'LineString', 'MultiLineString', 'Polygon', 'MultiPolygon', 'GeometryCollection',"
"'Feature' and 'FeatureCollection'. Else downloaded dictionary will not be valid geojson."
)
return _fileobj_to_dict(fileobj, path_from, **kwargs_reading)
else:
raise Exception(
"Download as dictionary currently supported only " "for 'json'."
raise NotImplementedError(
"Download as dictionary currently supported only "
"for 'json' and 'geojson'."
)
elif download_as == "list":
if path_from.endswith(tuple([".csv", ".txt", ".json"])):
return _fileobj_to_list(fileobj, path_from, **kwargs_reading)
else:
raise Exception(
raise NotImplementedError(
"Download as list currently supported only "
"for 'csv', 'txt' and 'json'."
)
elif download_as == "str":
if path_from.endswith(tuple([".txt"])):
return _fileobj_to_str(fileobj)
else:
raise Exception("Download as string currently supported only " "for 'txt'.")
raise NotImplementedError(
"Download as string currently supported only " "for 'txt'."
)
elif download_as == "np.array":
if path_from.endswith(tuple([".csv", ".parquet"])):
return _fileobj_to_np_array(fileobj, path_from, **kwargs_reading)
else:
raise Exception(
raise NotImplementedError(
"Download as numpy array currently supported only "
"for 'csv' and 'parquet'."
)
elif not download_as:
if path_from.endswith(tuple([".pkl"])):
return pickle.load(fileobj, **kwargs_reading)
else:
raise Exception("'download_as' is required for this file type.")
else:
raise Exception(
"'download_as' not provided. Choose between ('dataframe', "
raise ValueError(
"'download_as' not provided. Choose between ('dataframe', 'geodf', "
"'dict', 'list', 'str', 'np.array'). Not needed for 'pkl files'.'"
)

Expand Down
4 changes: 1 addition & 3 deletions nesta_ds_utils/loading_saving/file_ops.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from typing import Union
from pathlib import Path
from xmlrpc.client import Boolean
import zipfile
import os
from fnmatch import fnmatch


def _convert_str_to_pathlib_path(path: Union[Path, str]) -> Path:
Expand Down Expand Up @@ -32,7 +30,7 @@ def make_path_if_not_exist(path: Union[Path, str]):
def extractall(
zip_path: Union[Path, str],
out_path: Union[Path, str] = None,
delete_zip: Boolean = True,
delete_zip: bool = True,
):
"""Takes path to zipped file and extracts it to specified output path.

Expand Down
10 changes: 10 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ install_requires =
[options.extras_require]
s3 =
boto3==1.24.93
gis =
geopandas==0.13.2
io_extras =
openpyxl==3.0.9
viz =
altair==4.2.0
altair-saver==0.5.0
Expand All @@ -34,6 +38,8 @@ test =
pytest==7.1.3
moto[s3]==4.0.7
%(s3)s
%(gis)s
%(io_extras)s
%(viz)s
%(networks)s
%(nlp)s
Expand All @@ -51,11 +57,15 @@ dev =
pre-commit-hooks==4.3.0
black==22.10.0
%(s3)s
%(gis)s
%(io_extras)s
%(viz)s
%(networks)s
%(nlp)s
all =
%(s3)s
%(gis)s
%(io_extras)s
%(viz)s
%(networks)s
%(nlp)s
Expand Down
7 changes: 7 additions & 0 deletions tests/artifacts/dummy_dataframe.geojson
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "FeatureCollection",
"features": [
{ "type": "Feature", "properties": { "col1": "name1" }, "geometry": { "type": "Point", "coordinates": [ 1.0, 2.0 ] } },
{ "type": "Feature", "properties": { "col1": "name2" }, "geometry": { "type": "Point", "coordinates": [ 2.0, 1.0 ] } }
]
}
Binary file added tests/artifacts/dummy_dataframe.xlsm
Binary file not shown.
Binary file added tests/artifacts/dummy_dataframe.xlsx
Binary file not shown.
1 change: 1 addition & 0 deletions tests/artifacts/dummy_dict.geojson
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type": "FeatureCollection", "features": [{"id": "0", "type": "Feature", "properties": {"test": "name1"}, "geometry": {"type": "Point", "coordinates": [0, 0]}}], "crs": {"type": "name", "properties": {"name": "urn:ogc:def:crs:EPSG::3857"}}}
Loading