From 7ca3a9b8fa5462108a545f904be22bda4a0bf62b Mon Sep 17 00:00:00 2001 From: Mike Kittridge Date: Fri, 4 Aug 2023 13:13:29 -0600 Subject: [PATCH 1/3] cleaned up --- tethysts/main.py | 2 +- tethysts/utils.py | 474 ---------------------------------------------- 2 files changed, 1 insertion(+), 475 deletions(-) diff --git a/tethysts/main.py b/tethysts/main.py index de7259c..cdeff14 100644 --- a/tethysts/main.py +++ b/tethysts/main.py @@ -448,7 +448,7 @@ def get_results(self, lon : float or None See lat description. distance : float or None - See lat description. + See lat description. This should be in decimal degrees not meters. from_date : str, Timestamp, datetime, or None The start date of the selection. to_date : str, Timestamp, datetime, or None diff --git a/tethysts/utils.py b/tethysts/utils.py index 04a28ac..7d4db7f 100644 --- a/tethysts/utils.py +++ b/tethysts/utils.py @@ -1,6 +1,5 @@ """ - """ import io import numpy as np @@ -77,52 +76,6 @@ def create_public_s3_url(base_url, bucket, obj_key): return key -# class ResponseStream(object): -# """ -# In many applications, you'd like to access a requests response as a file-like object, simply having .read(), .seek(), and .tell() as normal. Especially when you only want to partially download a file, it'd be extra convenient if you could use a normal file interface for it, loading as needed. - -# This is a wrapper class for doing that. Only bytes you request will be loaded - see the example in the gist itself. - -# https://gist.github.com/obskyr/b9d4b4223e7eaf4eedcd9defabb34f13 -# """ -# def __init__(self, request_iterator): -# self._bytes = BytesIO() -# self._iterator = request_iterator - -# def _load_all(self): -# self._bytes.seek(0, SEEK_END) -# for chunk in self._iterator: -# self._bytes.write(chunk) - -# def _load_until(self, goal_position): -# current_position = self._bytes.seek(0, SEEK_END) -# while current_position < goal_position: -# try: -# current_position += self._bytes.write(next(self._iterator)) -# except StopIteration: -# break - -# def tell(self): -# return self._bytes.tell() - -# def read(self, size=None): -# left_off_at = self._bytes.tell() -# if size is None: -# self._load_all() -# else: -# goal_position = left_off_at + size -# self._load_until(goal_position) - -# self._bytes.seek(left_off_at) -# return self._bytes.read(size) - -# def seek(self, position, whence=SEEK_SET): -# if whence == SEEK_END: -# self._load_all() -# else: -# self._bytes.seek(position, whence) - - def cartesian_product(*arrays): la = len(arrays) dtype = np.result_type(*arrays) @@ -163,12 +116,6 @@ def get_intersected_stations(stns, geom_query): stn_ids = [stn_ids_list[r] for r in res_index] - # res_ids = [r.wkb_hex for r in res] - - # stn_id_dict = {shape(s['geometry']).wkb_hex: i for i, s in stns.items()} - - # stn_ids = [stn_id_dict[r] for r in res_ids] - return stn_ids @@ -234,39 +181,6 @@ def get_nearest_from_extent(data, return data1 -# def read_pkl_zstd(obj, unpickle=False): -# """ -# Deserializer from a pickled object compressed with zstandard. - -# Parameters -# ---------- -# obj : bytes or str -# Either a bytes object that has been pickled and compressed or a str path to the file object. -# unpickle : bool -# Should the bytes object be unpickled or left as bytes? - -# Returns -# ------- -# Python object -# """ -# if isinstance(obj, str): -# with open(obj, 'rb') as p: -# dctx = zstd.ZstdDecompressor() -# with dctx.stream_reader(p) as reader: -# obj1 = reader.read() - -# elif isinstance(obj, bytes): -# dctx = zstd.ZstdDecompressor() -# obj1 = dctx.decompress(obj) -# else: -# raise TypeError('obj must either be a str path or a bytes object') - -# if unpickle: -# obj1 = pickle.loads(obj1) - -# return obj1 - - def read_json_zstd(obj): """ Deserializer from a compressed zstandard json object to a dictionary. @@ -296,133 +210,6 @@ def read_json_zstd(obj): return dict1 -# def s3_client(connection_config: dict, max_pool_connections: int = 30): -# """ -# Function to establish a client connection with an S3 account. This can use the legacy connect (signature_version s3) and the curent version. - -# Parameters -# ---------- -# connection_config : dict -# A dictionary of the connection info necessary to establish an S3 connection. It should contain service_name, endpoint_url, aws_access_key_id, and aws_secret_access_key. connection_config can also be a URL to a public S3 bucket. -# max_pool_connections : int -# The number of simultaneous connections for the S3 connection. - -# Returns -# ------- -# S3 client object -# """ -# ## Validate config -# _ = tdm.base.ConnectionConfig(**connection_config) - -# s3_config = copy.deepcopy(connection_config) - -# if 'config' in s3_config: -# config0 = s3_config.pop('config') -# config0.update({'max_pool_connections': max_pool_connections}) -# config1 = boto3.session.Config(**config0) - -# s3_config1 = s3_config.copy() -# s3_config1.update({'config': config1}) - -# s3 = boto3.client(**s3_config1) -# else: -# s3_config.update({'config': botocore.config.Config(max_pool_connections=max_pool_connections)}) -# s3 = boto3.client(**s3_config) - -# return s3 - - -# def get_object_s3(obj_key: str, bucket: str, s3: botocore.client.BaseClient = None, connection_config: dict = None, public_url: HttpUrl=None, version_id=None, range_start: int=None, range_end: int=None, chunk_size=524288, counter=5): -# """ -# General function to get an object from an S3 bucket. One of s3, connection_config, or public_url must be used. - -# Parameters -# ---------- -# obj_key : str -# The object key in the S3 bucket. -# s3 : botocore.client.BaseClient -# An S3 client object created via the s3_client function. -# connection_config : dict -# A dictionary of the connection info necessary to establish an S3 connection. It should contain service_name, s3, endpoint_url, aws_access_key_id, and aws_secret_access_key. -# public_url : str -# A URL to a public S3 bucket. This is generally only used for Backblaze object storage. -# bucket : str -# The bucket name. -# counter : int -# Number of times to retry to get the object. - -# Returns -# ------- -# bytes -# bytes object of the S3 object. -# """ -# counter1 = counter - -# transport_params = {'buffer_size': chunk_size} - -# if isinstance(version_id, str): -# transport_params['version_id'] = version_id - -# ## Headers -# headers = {} -# # Range -# range_dict = {} - -# if range_start is not None: -# range_dict['start'] = str(range_start) -# else: -# range_dict['start'] = '' - -# if range_end is not None: -# range_dict['end'] = str(range_end) -# else: -# range_dict['end'] = '' - -# ## Get the object -# while True: -# try: -# if isinstance(public_url, str) and (version_id is None): -# url = create_public_s3_url(public_url, bucket, obj_key) - -# if range_dict: -# range1 = 'bytes={start}-{end}'.format(**range_dict) -# headers['Range'] = range1 - -# file_obj = smart_open.open(url, 'rb', headers=headers, transport_params=transport_params) - -# elif isinstance(s3, botocore.client.BaseClient) or isinstance(connection_config, dict): -# if range_dict: -# range1 = 'bytes={start}-{end}'.format(**range_dict) -# transport_params.update({'client_kwargs': {'S3.Client.get_object': {'Range': range1}}}) - -# if s3 is None: -# _ = tdm.base.ConnectionConfig(**connection_config) - -# s3 = s3_client(connection_config) - -# s3_url = s3_url_base.format(bucket=bucket, key=obj_key) -# transport_params['client'] = s3 - -# file_obj = smart_open.open(s3_url, 'rb', transport_params=transport_params) - -# else: -# raise TypeError('One of s3, connection_config, or public_url needs to be correctly defined.') - -# break -# except: -# # print(traceback.format_exc()) -# if counter1 == 0: -# # raise ValueError('Could not properly download the object after several tries') -# print('Object could not be downloaded.') -# return None -# else: -# # print('Could not properly extract the object; trying again in 5 seconds') -# counter1 = counter1 - 1 -# sleep(3) - -# return file_obj - - def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, to_date=None, heights=None, bands=None, from_mod_date=None, to_mod_date=None): """ @@ -491,40 +278,6 @@ def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, t return rc2 -# def remove_char_dim_names(data): -# """ - -# """ -# for v in data.variables: -# if 'char_dim_name' in data[v].encoding: -# _ = data[v].encoding.pop('char_dim_name') - -# return data - - -# def remove_results_junk(data): -# """ - -# """ -# chunk_vars = [v for v in list(data.variables) if 'chunk' in v] -# if chunk_vars: -# data = data.drop_vars(chunk_vars) - -# # crap_vars = [v for v in list(data.variables) if 'string' in v] -# # if crap_vars: -# # data = data.drop_vars(crap_vars) - -# # if 'geometry' in data.dims: -# # stn_vars = [v for v in list(data.data_vars) if ('time' not in data[v].dims) and (v not in ['station_id', 'lon', 'lat'])] -# # data = data.drop_vars(stn_vars) - -# if 'station_geometry' in data.dims: -# stn_vars = [d for d in data.variables if 'station_geometry' in data[d].dims] -# data = data.drop_vars(stn_vars) - -# return data - - def result_filters(h5, from_date=None, to_date=None): """ @@ -547,194 +300,6 @@ def result_filters(h5, from_date=None, to_date=None): return h5 -# def process_results_output(ts_xr, output='xarray', squeeze_dims=False): -# """ - -# """ -# ## Return -# if squeeze_dims: -# ts_xr = ts_xr.squeeze() - -# if output == 'xarray': -# return ts_xr - -# elif output == 'dict': -# data_dict = ts_xr.to_dict() - -# return data_dict - -# elif output == 'json': -# json1 = orjson.dumps(ts_xr.to_dict(), option=orjson.OPT_OMIT_MICROSECONDS | orjson.OPT_SERIALIZE_NUMPY) - -# return json1 -# else: -# raise ValueError("output must be one of 'xarray', 'dict', or 'json'") - - -# def read_in_chunks(file_object, chunk_size=524288): -# while True: -# data = file_object.read(chunk_size) -# if not data: -# break -# yield data - - -# def local_file_byte_iterator(path, chunk_size=DEFAULT_BUFFER_SIZE): -# """given a path, return an iterator over the file -# that lazily loads the file. -# https://stackoverflow.com/a/37222446/6952674 -# """ -# path = pathlib.Path(path) -# with path.open('rb') as file: -# reader = partial(file.read1, DEFAULT_BUFFER_SIZE) -# file_iterator = iter(reader, bytes()) -# for chunk in file_iterator: -# yield from chunk - - -# def stream_to_file(file_obj, file_path, chunk_size=524288): -# """ - -# """ -# file_path1 = pathlib.Path(file_path) -# file_path1.parent.mkdir(parents=True, exist_ok=True) - -# with open(file_path1, 'wb') as f: -# chunk = file_obj.read(chunk_size) -# while chunk: -# f.write(chunk) -# chunk = file_obj.read(chunk_size) - - -# def decompress_stream_to_file(file_obj, file_path, chunk_size=524288): -# """ - -# """ -# file_path1 = pathlib.Path(file_path) -# file_path1.parent.mkdir(parents=True, exist_ok=True) - -# if file_path1.suffix == '.zst': -# file_path2 = file_path1.stem -# dctx = zstd.ZstdDecompressor() - -# with open(file_path2, 'wb') as f: -# dctx.copy_stream(file_obj, f, read_size=chunk_size, write_size=chunk_size) - -# elif file_path1.suffix == '.gz': -# file_path2 = file_path1.stem - -# with gzip.open(file_obj, 'rb') as s_file, open(file_path2, 'wb') as d_file: -# shutil.copyfileobj(s_file, d_file, chunk_size) - -# else: -# file_path2 = file_path1 -# stream_to_file(file_obj, file_path2, chunk_size) - -# return str(file_path2) - - -# def decompress_stream_to_object(file_obj, compression, chunk_size=524288): -# """ - -# """ -# b1 = BytesIO() - -# if compression == 'zstd': -# dctx = zstd.ZstdDecompressor() - -# with open(b1, 'wb') as f: -# dctx.copy_stream(file_obj, f, read_size=chunk_size, write_size=chunk_size) - -# elif compression == '.gz': - -# with gzip.open(file_obj, 'rb') as s_file, open(b1, 'wb') as d_file: -# shutil.copyfileobj(s_file, d_file, chunk_size) - -# else: -# with open(b1, 'wb') as f: -# chunk = file_obj.read(chunk_size) -# while chunk: -# f.write(chunk) -# chunk = file_obj.read(chunk_size) - -# return b1 - - -# def url_stream_to_file(url, file_path, decompress=False, chunk_size=524288): -# """ - -# """ -# file_path1 = pathlib.Path(file_path) -# if file_path1.is_dir(): -# file_name = url.split('/')[-1] -# file_path2 = str(file_path1.joinpath(file_name)) -# else: -# file_path2 = file_path - -# base_path = os.path.split(file_path2)[0] -# os.makedirs(base_path, exist_ok=True) - -# counter = 4 -# while True: -# try: -# with requests.get(url, stream=True, timeout=300) as r: -# r.raise_for_status() -# stream = ResponseStream(r.iter_content(chunk_size)) - -# if decompress: -# if str(file_path2).endswith('.zst'): -# file_path2 = os.path.splitext(file_path2)[0] -# dctx = zstd.ZstdDecompressor() - -# with open(file_path2, 'wb') as f: -# dctx.copy_stream(stream, f, read_size=chunk_size, write_size=chunk_size) - -# elif str(file_path2).endswith('.gz'): -# file_path2 = os.path.splitext(file_path2)[0] - -# with gzip.open(stream, 'rb') as s_file, open(file_path2, 'wb') as d_file: -# shutil.copyfileobj(s_file, d_file, chunk_size) - -# else: -# with open(file_path2, 'wb') as f: -# chunk = stream.read(chunk_size) -# while chunk: -# f.write(chunk) -# chunk = stream.read(chunk_size) -# else: -# with open(file_path2, 'wb') as f: -# chunk = stream.read(chunk_size) -# while chunk: -# f.write(chunk) -# chunk = stream.read(chunk_size) - -# break - -# except Exception as err: -# if counter < 1: -# raise err -# else: -# counter = counter - 1 -# sleep(5) - -# return file_path2 - - -# def process_dataset(data, from_date=None, to_date=None): -# """ -# Stupid xarray being inefficient at parsing file objects... -# """ -# ## Remove junk fields -# h1 = H5(data).sel(exclude_coords=['station_geometry', 'chunk_date']) - -# h2 = result_filters(h1, from_date, to_date) - -# data_obj = io.BytesIO() -# h2.to_hdf5(data_obj) - -# return data_obj - - def download_results(chunk: dict, bucket: str, s3: botocore.client.BaseClient = None, connection_config: dict = None, public_url: HttpUrl = None, cache: Union[pathlib.Path] = None, from_date=None, to_date=None, return_raw=False): """ @@ -893,42 +458,3 @@ def results_concat(results_list, output_path=None, from_date=None, to_date=None, xr3 = filter_mod_dates(xr3, from_mod_date, to_mod_date) return xr3 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - From e7afe84a9171f0c3dd8a27181c728031c40ec1ba Mon Sep 17 00:00:00 2001 From: Mike Kittridge Date: Fri, 18 Aug 2023 11:08:54 -0600 Subject: [PATCH 2/3] added back a util function --- conda/meta.yaml | 4 ++-- setup.py | 4 ++-- tethysts/utils.py | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/conda/meta.yaml b/conda/meta.yaml index e0e75d6..0001bb5 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -1,5 +1,5 @@ {% set name = "tethysts" %} -{% set version = "4.5.13" %} +{% set version = "4.5.14" %} # {% set sha256 = "ae2cc83fb5a75e8dc3e1b2c2137deea412c8a4c7c9acca52bf4ec59de52a80c9" %} # sha256 is the prefered checksum -- you can get it for a file with: @@ -38,7 +38,7 @@ requirements: - python >=3.8 - pandas<2 - tethys-data-models=0.4.11 - - hdf5tools>=0.1.14 + - hdf5tools>=0.2.3 - s3tethys>=0.0.8 - xarray>=2022.6.0 - pydantic=1.10 diff --git a/setup.py b/setup.py index 25a174f..c43b537 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ name = 'tethysts' main_package = 'tethysts' datasets = 'datasets/time_series' -version = '4.5.13' +version = '4.5.14' descrip = 'tethys time series S3 extraction' # The below code is for readthedocs. To have sphinx/readthedocs interact with @@ -19,7 +19,7 @@ if os.environ.get('READTHEDOCS', False) == 'True': INSTALL_REQUIRES = [] else: - INSTALL_REQUIRES = ['zstandard', 'pandas<2', 'xarray>=2022.6.0', 'scipy', 'orjson', 'requests', 'shapely>=2.0.1', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.1.14', 's3tethys>=0.0.8', 'pydantic==1.10', 'h5netcdf>=1.1.0'] + INSTALL_REQUIRES = ['zstandard', 'pandas<2', 'xarray>=2022.6.0', 'scipy', 'orjson', 'requests', 'shapely>=2.0.1', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.2.3', 's3tethys>=0.0.8', 'pydantic==1.10', 'h5netcdf>=1.1.0'] # Get the long description from the README file with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f: diff --git a/tethysts/utils.py b/tethysts/utils.py index 7d4db7f..5f91190 100644 --- a/tethysts/utils.py +++ b/tethysts/utils.py @@ -9,6 +9,7 @@ from datetime import datetime import zstandard as zstd import copy +import pickle import botocore from shapely.geometry import shape, Polygon, Point from shapely.strtree import STRtree @@ -36,6 +37,39 @@ ### Helper functions +def read_pkl_zstd(obj, unpickle=False): + """ + Deserializer from a pickled object compressed with zstandard. + + Parameters + ---------- + obj : bytes or str + Either a bytes object that has been pickled and compressed or a str path to the file object. + unpickle : bool + Should the bytes object be unpickled or left as bytes? + + Returns + ------- + Python object + """ + if isinstance(obj, str): + with open(obj, 'rb') as p: + dctx = zstd.ZstdDecompressor() + with dctx.stream_reader(p) as reader: + obj1 = reader.read() + + elif isinstance(obj, bytes): + dctx = zstd.ZstdDecompressor() + obj1 = dctx.decompress(obj) + else: + raise TypeError('obj must either be a str path or a bytes object') + + if unpickle: + obj1 = pickle.loads(obj1) + + return obj1 + + def update_nested(in_dict, ds_id, version_date, value): """ From 407c37f7ccd9f0ea8171a9f3bebb61889481f1d0 Mon Sep 17 00:00:00 2001 From: Mike Kittridge Date: Fri, 18 Aug 2023 11:45:47 -0600 Subject: [PATCH 3/3] updated package deps --- conda/meta.yaml | 4 ++-- setup.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/conda/meta.yaml b/conda/meta.yaml index 0001bb5..ecc11c2 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -1,5 +1,5 @@ {% set name = "tethysts" %} -{% set version = "4.5.14" %} +{% set version = "4.5.15" %} # {% set sha256 = "ae2cc83fb5a75e8dc3e1b2c2137deea412c8a4c7c9acca52bf4ec59de52a80c9" %} # sha256 is the prefered checksum -- you can get it for a file with: @@ -38,7 +38,7 @@ requirements: - python >=3.8 - pandas<2 - tethys-data-models=0.4.11 - - hdf5tools>=0.2.3 + - hdf5tools>=0.2.4 - s3tethys>=0.0.8 - xarray>=2022.6.0 - pydantic=1.10 diff --git a/setup.py b/setup.py index c43b537..1b86052 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ name = 'tethysts' main_package = 'tethysts' datasets = 'datasets/time_series' -version = '4.5.14' +version = '4.5.15' descrip = 'tethys time series S3 extraction' # The below code is for readthedocs. To have sphinx/readthedocs interact with @@ -19,7 +19,7 @@ if os.environ.get('READTHEDOCS', False) == 'True': INSTALL_REQUIRES = [] else: - INSTALL_REQUIRES = ['zstandard', 'pandas<2', 'xarray>=2022.6.0', 'scipy', 'orjson', 'requests', 'shapely>=2.0.1', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.2.3', 's3tethys>=0.0.8', 'pydantic==1.10', 'h5netcdf>=1.1.0'] + INSTALL_REQUIRES = ['zstandard', 'pandas<2', 'xarray>=2022.6.0', 'scipy', 'orjson', 'requests', 'shapely>=2.0.1', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.2.4', 's3tethys>=0.0.8', 'pydantic==1.10', 'h5netcdf>=1.1.0'] # Get the long description from the README file with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f: