From fc3327819a838e8fc44840d7d28d18ada0b234d0 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 15 Jul 2024 12:05:14 +0200 Subject: [PATCH 01/53] Fixed Geokube Version --- drivers/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/Dockerfile b/drivers/Dockerfile index d94bd0c..4920f4e 100644 --- a/drivers/Dockerfile +++ b/drivers/Dockerfile @@ -1,6 +1,6 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geokube # ARG TAG=v0.2.6b1 -ARG TAG=latest +ARG TAG=2024.05.03.10.36 FROM $REGISTRY/geokube:$TAG COPY dist/intake_geokube-0.1a0-py3-none-any.whl / From af6884ccd492887fc78ac12a6d6115113e724804 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 15 Jul 2024 12:05:43 +0200 Subject: [PATCH 02/53] fix bug on dataset with no cache --- datastore/datastore/datastore.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datastore/datastore/datastore.py b/datastore/datastore/datastore.py index 2525c02..b754fc0 100644 --- a/datastore/datastore/datastore.py +++ b/datastore/datastore/datastore.py @@ -50,7 +50,7 @@ def __init__(self) -> None: @log_execution_time(_LOG) def get_cached_product_or_read( - self, dataset_id: str, product_id: str, query: GeoQuery | None = None + self, dataset_id: str, product_id: str, ) -> DataCube | Dataset: """Get product from the cache instead of loading files indicated in the catalog if `metadata_caching` set to `True`. @@ -81,7 +81,7 @@ def get_cached_product_or_read( ) return self.catalog(CACHE_DIR=self.cache_dir)[dataset_id][ product_id - ].get(geoquery=query, compute=False).read_chunked() + ].read_chunked() return self.cache[dataset_id][product_id] @log_execution_time(_LOG) @@ -389,7 +389,7 @@ def estimate( # NOTE: we always use catalog directly and single product cache self._LOG.debug("loading product...") # NOTE: for estimation we use cached products - kube = self.get_cached_product_or_read(dataset_id, product_id, query=query) + kube = self.get_cached_product_or_read(dataset_id, product_id) self._LOG.debug("original kube len: %s", len(kube)) return Datastore._process_query(kube, geoquery, False).nbytes From 8e10e758d75118689436e2263ee24b08d5f50050 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 15 Jul 2024 12:06:05 +0200 Subject: [PATCH 03/53] adding new driver for Active Fire Monitoring dataset --- drivers/intake_geokube/afm.py | 71 +++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 drivers/intake_geokube/afm.py diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py new file mode 100644 index 0000000..3f20cb0 --- /dev/null +++ b/drivers/intake_geokube/afm.py @@ -0,0 +1,71 @@ +"""geokube driver for intake.""" +import logging +from functools import partial +from typing import Any, Mapping, Optional, Union + +import numpy as np +import xarray as xr + +from .base import GeokubeSource +from geokube import open_datacube, open_dataset + + +def preprocess_afm(dset: xr.Dataset) -> xr.Dataset: + return dset.to_dataframe().set_index(['lat', 'lon'], append=True).to_xarray() + + +class CMCCAFMSource(GeokubeSource): + name = "cmcc_afm_geokube" + + def __init__( + self, + path: str, + pattern: str = None, + field_id: str = None, + delay_read_cubes: bool = False, + metadata_caching: bool = False, + metadata_cache_path: str = None, + storage_options: dict = None, + xarray_kwargs: dict = None, + metadata=None, + mapping: Optional[Mapping[str, Mapping[str, str]]] = None, + load_files_on_persistance: Optional[bool] = True, + ): + self._kube = None + self.path = path + self.pattern = pattern + self.field_id = field_id + self.delay_read_cubes = delay_read_cubes + self.metadata_caching = metadata_caching + self.metadata_cache_path = metadata_cache_path + self.storage_options = storage_options + self.mapping = mapping + self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs + self.load_files_on_persistance = load_files_on_persistance + self.preprocess = preprocess_afm + super(CMCCAFMSource, self).__init__(metadata=metadata) + + def _open_dataset(self): + if self.pattern is None: + self._kube = open_datacube( + path=self.path, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + preprocess=self.preprocess, + ) + else: + self._kube = open_dataset( + path=self.path, + pattern=self.pattern, + id_pattern=self.field_id, + delay_read_cubes=self.delay_read_cubes, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + preprocess=self.preprocess, + ) + return self._kube From 1d540211dca655abc9e66744adf6f01eb1309aac Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 15 Jul 2024 12:48:10 +0200 Subject: [PATCH 04/53] adding driver to setup.py --- drivers/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/drivers/setup.py b/drivers/setup.py index c99c224..b3a3032 100644 --- a/drivers/setup.py +++ b/drivers/setup.py @@ -18,6 +18,7 @@ "intake.drivers": [ "geokube_netcdf = intake_geokube.netcdf:NetCDFSource", "cmcc_wrf_geokube = intake_geokube.wrf:CMCCWRFSource", + "cmcc_afm_geokube = intake_geokube.afm:CMCCAFMSource", ] }, classifiers=[ From 946e0f2f53cc78400d7a7f327c916fcdabeca7fd Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 15 Jul 2024 16:04:30 +0200 Subject: [PATCH 05/53] changing from preprocess to post process data --- drivers/intake_geokube/afm.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 3f20cb0..ca91974 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -9,9 +9,15 @@ from .base import GeokubeSource from geokube import open_datacube, open_dataset +from geokube.core.datacube import DataCube -def preprocess_afm(dset: xr.Dataset) -> xr.Dataset: - return dset.to_dataframe().set_index(['lat', 'lon'], append=True).to_xarray() + +def postprocess_afm(dset: xr.Dataset) -> xr.Dataset: + latitude = dset['lat'].values + longitude = dset['lon'].values + dset = dset.drop('lat') + dset = dset.drop('lon') + return dset.expand_dims(dim={"lat": latitude, "lon": longitude}, axis=(1, 0)) class CMCCAFMSource(GeokubeSource): @@ -42,7 +48,7 @@ def __init__( self.mapping = mapping self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs self.load_files_on_persistance = load_files_on_persistance - self.preprocess = preprocess_afm + #self.preprocess = preprocess_afm super(CMCCAFMSource, self).__init__(metadata=metadata) def _open_dataset(self): @@ -54,7 +60,6 @@ def _open_dataset(self): metadata_cache_path=self.metadata_cache_path, mapping=self.mapping, **self.xarray_kwargs, - preprocess=self.preprocess, ) else: self._kube = open_dataset( @@ -66,6 +71,7 @@ def _open_dataset(self): metadata_cache_path=self.metadata_cache_path, mapping=self.mapping, **self.xarray_kwargs, - preprocess=self.preprocess, ) + ds = postprocess_afm(self._kube.to_xarray()) + self._kube = Datacube.from_xarray(ds) return self._kube From 7145d8b37bc68566291bc6a8b3b66792e12259cc Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 15 Jul 2024 16:17:27 +0200 Subject: [PATCH 06/53] fix typo in DataCube --- drivers/intake_geokube/afm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index ca91974..4b392a6 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -73,5 +73,5 @@ def _open_dataset(self): **self.xarray_kwargs, ) ds = postprocess_afm(self._kube.to_xarray()) - self._kube = Datacube.from_xarray(ds) + self._kube = DataCube.from_xarray(ds) return self._kube From f89ccbca0670a18526256c6e9c2bf5e604fb4b20 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Wed, 17 Jul 2024 15:21:09 +0200 Subject: [PATCH 07/53] adding sort into driver --- drivers/intake_geokube/afm.py | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 4b392a6..a636ccb 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -17,7 +17,7 @@ def postprocess_afm(dset: xr.Dataset) -> xr.Dataset: longitude = dset['lon'].values dset = dset.drop('lat') dset = dset.drop('lon') - return dset.expand_dims(dim={"lat": latitude, "lon": longitude}, axis=(1, 0)) + return dset.expand_dims(dim={"lat": latitude, "lon": longitude}, axis=(1, 0)).sortby('time') class CMCCAFMSource(GeokubeSource): @@ -52,26 +52,16 @@ def __init__( super(CMCCAFMSource, self).__init__(metadata=metadata) def _open_dataset(self): - if self.pattern is None: - self._kube = open_datacube( - path=self.path, - id_pattern=self.field_id, - metadata_caching=self.metadata_caching, - metadata_cache_path=self.metadata_cache_path, - mapping=self.mapping, - **self.xarray_kwargs, + self._kube = DataCube.from_xarray( + postprocess_afm( + open_datacube( + path=self.path, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + ).to_xarray() ) - else: - self._kube = open_dataset( - path=self.path, - pattern=self.pattern, - id_pattern=self.field_id, - delay_read_cubes=self.delay_read_cubes, - metadata_caching=self.metadata_caching, - metadata_cache_path=self.metadata_cache_path, - mapping=self.mapping, - **self.xarray_kwargs, - ) - ds = postprocess_afm(self._kube.to_xarray()) - self._kube = DataCube.from_xarray(ds) + ) return self._kube From eab48e8546fca532451f157e2b6a10d30fd25a56 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Wed, 17 Jul 2024 15:42:34 +0200 Subject: [PATCH 08/53] dropping certainty variable from dataset --- drivers/intake_geokube/afm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index a636ccb..88e4f30 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -17,6 +17,7 @@ def postprocess_afm(dset: xr.Dataset) -> xr.Dataset: longitude = dset['lon'].values dset = dset.drop('lat') dset = dset.drop('lon') + dset = dset.drop('certainty') return dset.expand_dims(dim={"lat": latitude, "lon": longitude}, axis=(1, 0)).sortby('time') From a52824264b6b7d97c4dc9f9cb8cc44cb4118e4cd Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Wed, 17 Jul 2024 15:52:52 +0200 Subject: [PATCH 09/53] removing postprocess call --- drivers/intake_geokube/afm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 88e4f30..987690b 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -54,7 +54,7 @@ def __init__( def _open_dataset(self): self._kube = DataCube.from_xarray( - postprocess_afm( + #postprocess_afm( open_datacube( path=self.path, id_pattern=self.field_id, @@ -63,6 +63,6 @@ def _open_dataset(self): mapping=self.mapping, **self.xarray_kwargs, ).to_xarray() - ) + #) ) return self._kube From 6e14a4a8a774d5fa253b234003113ffa0716da82 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Wed, 17 Jul 2024 16:11:07 +0200 Subject: [PATCH 10/53] doing only sort in post process --- drivers/intake_geokube/afm.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 987690b..f8afe0f 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -13,12 +13,13 @@ def postprocess_afm(dset: xr.Dataset) -> xr.Dataset: - latitude = dset['lat'].values - longitude = dset['lon'].values - dset = dset.drop('lat') - dset = dset.drop('lon') - dset = dset.drop('certainty') - return dset.expand_dims(dim={"lat": latitude, "lon": longitude}, axis=(1, 0)).sortby('time') + #latitude = dset['lat'].values + #longitude = dset['lon'].values + #dset = dset.drop('lat') + #dset = dset.drop('lon') + #dset = dset.drop('certainty') + #return dset.expand_dims(dim={"lat": latitude, "lon": longitude}, axis=(1, 0)).sortby('time') + return dset.sortby('time') class CMCCAFMSource(GeokubeSource): @@ -54,7 +55,7 @@ def __init__( def _open_dataset(self): self._kube = DataCube.from_xarray( - #postprocess_afm( + postprocess_afm( open_datacube( path=self.path, id_pattern=self.field_id, @@ -63,6 +64,6 @@ def _open_dataset(self): mapping=self.mapping, **self.xarray_kwargs, ).to_xarray() - #) + ) ) return self._kube From 15772cd639f4bbfb78ec18152ae4ea8d5537b919 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Wed, 17 Jul 2024 16:23:02 +0200 Subject: [PATCH 11/53] check if values explode memory --- drivers/intake_geokube/afm.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index f8afe0f..3d0d0d1 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -13,11 +13,10 @@ def postprocess_afm(dset: xr.Dataset) -> xr.Dataset: - #latitude = dset['lat'].values - #longitude = dset['lon'].values - #dset = dset.drop('lat') - #dset = dset.drop('lon') - #dset = dset.drop('certainty') + latitude = dset['lat'].values + longitude = dset['lon'].values + dset = dset.drop('lat') + dset = dset.drop('lon') #return dset.expand_dims(dim={"lat": latitude, "lon": longitude}, axis=(1, 0)).sortby('time') return dset.sortby('time') From 5c4cea86c7a02dff93eb5847c4291a785c8333b0 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Wed, 17 Jul 2024 16:33:47 +0200 Subject: [PATCH 12/53] expanding only lat --- drivers/intake_geokube/afm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 3d0d0d1..5e80b00 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -17,8 +17,8 @@ def postprocess_afm(dset: xr.Dataset) -> xr.Dataset: longitude = dset['lon'].values dset = dset.drop('lat') dset = dset.drop('lon') - #return dset.expand_dims(dim={"lat": latitude, "lon": longitude}, axis=(1, 0)).sortby('time') - return dset.sortby('time') + dset = dset.sortby('time') + return dset.expand_dims(dim={"lat": latitude}, axis=0) #, "lon": longitude}, axis=(1, 0)) class CMCCAFMSource(GeokubeSource): From 25d07d10c1fa8fc1b0c27f362358047d028d2e8b Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Wed, 17 Jul 2024 16:41:53 +0200 Subject: [PATCH 13/53] expanding longitude dim --- drivers/intake_geokube/afm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 5e80b00..63353cf 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -18,7 +18,7 @@ def postprocess_afm(dset: xr.Dataset) -> xr.Dataset: dset = dset.drop('lat') dset = dset.drop('lon') dset = dset.sortby('time') - return dset.expand_dims(dim={"lat": latitude}, axis=0) #, "lon": longitude}, axis=(1, 0)) + return dset.expand_dims(dim={"lat": latitude}, axis=0).expand_dims(dim={"lon": longitude}, axis=0) class CMCCAFMSource(GeokubeSource): From 0402069e3ed13dcff5c19542a8908be579def078 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Wed, 17 Jul 2024 16:50:39 +0200 Subject: [PATCH 14/53] breaking the expand dim operation into two steps --- drivers/intake_geokube/afm.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 63353cf..5792e71 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -18,7 +18,9 @@ def postprocess_afm(dset: xr.Dataset) -> xr.Dataset: dset = dset.drop('lat') dset = dset.drop('lon') dset = dset.sortby('time') - return dset.expand_dims(dim={"lat": latitude}, axis=0).expand_dims(dim={"lon": longitude}, axis=0) + dset = dset.expand_dims(dim={"lat": latitude}, axis=0) + dset = dset.expand_dims(dim={"lon": longitude}, axis=0) + return dset class CMCCAFMSource(GeokubeSource): From b16b97c9e16b306cdcc40b4cbaa8814c53f5c427 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 08:27:00 +0200 Subject: [PATCH 15/53] ignoring spotlight files --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 2c15e0a..1856070 100644 --- a/.gitignore +++ b/.gitignore @@ -112,3 +112,4 @@ venv.bak/ _catalogs/ _old/ +.DS_Store \ No newline at end of file From d503dbb567e07cd9f52bce4c34a42c3f7b8f9068 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 09:00:25 +0200 Subject: [PATCH 16/53] applying chunking after expand dim --- drivers/intake_geokube/afm.py | 50 +++++++++++++++++------------------ 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 5792e71..e01d45a 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -12,33 +12,32 @@ from geokube.core.datacube import DataCube -def postprocess_afm(dset: xr.Dataset) -> xr.Dataset: +def preprocess_afm(dset: xr.Dataset) -> xr.Dataset: latitude = dset['lat'].values longitude = dset['lon'].values dset = dset.drop('lat') dset = dset.drop('lon') dset = dset.sortby('time') - dset = dset.expand_dims(dim={"lat": latitude}, axis=0) - dset = dset.expand_dims(dim={"lon": longitude}, axis=0) - return dset + dset = dset.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(0, 1)) + return dset.chunk({'time': 10, 'latitude': 50, 'longitude': 50}) class CMCCAFMSource(GeokubeSource): name = "cmcc_afm_geokube" def __init__( - self, - path: str, - pattern: str = None, - field_id: str = None, - delay_read_cubes: bool = False, - metadata_caching: bool = False, - metadata_cache_path: str = None, - storage_options: dict = None, - xarray_kwargs: dict = None, - metadata=None, - mapping: Optional[Mapping[str, Mapping[str, str]]] = None, - load_files_on_persistance: Optional[bool] = True, + self, + path: str, + pattern: str = None, + field_id: str = None, + delay_read_cubes: bool = False, + metadata_caching: bool = False, + metadata_cache_path: str = None, + storage_options: dict = None, + xarray_kwargs: dict = None, + metadata=None, + mapping: Optional[Mapping[str, Mapping[str, str]]] = None, + load_files_on_persistance: Optional[bool] = True, ): self._kube = None self.path = path @@ -51,20 +50,19 @@ def __init__( self.mapping = mapping self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs self.load_files_on_persistance = load_files_on_persistance - #self.preprocess = preprocess_afm + self.preprocess = preprocess_afm super(CMCCAFMSource, self).__init__(metadata=metadata) def _open_dataset(self): self._kube = DataCube.from_xarray( - postprocess_afm( - open_datacube( - path=self.path, - id_pattern=self.field_id, - metadata_caching=self.metadata_caching, - metadata_cache_path=self.metadata_cache_path, - mapping=self.mapping, - **self.xarray_kwargs, - ).to_xarray() + open_datacube( + path=self.path, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + preprocess=self.preprocess ) ) return self._kube From 40710f2306a1d2555e61506c72ae9f06314ffb5a Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 09:14:28 +0200 Subject: [PATCH 17/53] resetting indexes to remove duplicate --- drivers/intake_geokube/afm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index e01d45a..641a2dc 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -19,6 +19,7 @@ def preprocess_afm(dset: xr.Dataset) -> xr.Dataset: dset = dset.drop('lon') dset = dset.sortby('time') dset = dset.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(0, 1)) + dset = dset.reset_index(dims_or_levels=['latitude','longitude','time'],drop=True) return dset.chunk({'time': 10, 'latitude': 50, 'longitude': 50}) From 80c42b993139ed294e3ef85027e3ce0695686070 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 09:25:37 +0200 Subject: [PATCH 18/53] adding a post process function to remove duplicate coordinate and applying chunking --- drivers/intake_geokube/afm.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 641a2dc..c79945d 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -17,10 +17,13 @@ def preprocess_afm(dset: xr.Dataset) -> xr.Dataset: longitude = dset['lon'].values dset = dset.drop('lat') dset = dset.drop('lon') - dset = dset.sortby('time') dset = dset.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(0, 1)) - dset = dset.reset_index(dims_or_levels=['latitude','longitude','time'],drop=True) - return dset.chunk({'time': 10, 'latitude': 50, 'longitude': 50}) + return dset + + +def post_process_afm(dset: xr.Dataset) -> xr.Dataset: + return dset.reset_index(dims_or_levels=['latitude', 'longitude'], drop=True).sortby('time').chunk( + {'time': 50, 'latitude': 50, 'longitude': 50}) class CMCCAFMSource(GeokubeSource): @@ -56,14 +59,16 @@ def __init__( def _open_dataset(self): self._kube = DataCube.from_xarray( - open_datacube( - path=self.path, - id_pattern=self.field_id, - metadata_caching=self.metadata_caching, - metadata_cache_path=self.metadata_cache_path, - mapping=self.mapping, - **self.xarray_kwargs, - preprocess=self.preprocess + post_process_afm( + open_datacube( + path=self.path, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + preprocess=self.preprocess + ).to_xarray() ) ) return self._kube From 5a2f15fa269ead09fed3fb4d104639aefd8a0a9a Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 09:46:59 +0200 Subject: [PATCH 19/53] removing duplicated indexes by sel --- drivers/intake_geokube/afm.py | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index c79945d..44eeae0 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -12,19 +12,18 @@ from geokube.core.datacube import DataCube -def preprocess_afm(dset: xr.Dataset) -> xr.Dataset: - latitude = dset['lat'].values - longitude = dset['lon'].values - dset = dset.drop('lat') - dset = dset.drop('lon') - dset = dset.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(0, 1)) - return dset - - -def post_process_afm(dset: xr.Dataset) -> xr.Dataset: - return dset.reset_index(dims_or_levels=['latitude', 'longitude'], drop=True).sortby('time').chunk( - {'time': 50, 'latitude': 50, 'longitude': 50}) - +def preprocess_afm(ds: xr.Dataset) -> xr.Dataset: + latitude = ds['lat'].values + longitude = ds['lon'].values + # ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0)) + ds = ds.drop('lat') + ds = ds.drop('lon') + ds = ds.drop('certainty') + deduplicated = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1, 0)) + for dim in ds.dims: + indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} + deduplicated = deduplicated.isel(indexes) + return deduplicated class CMCCAFMSource(GeokubeSource): name = "cmcc_afm_geokube" @@ -58,9 +57,7 @@ def __init__( super(CMCCAFMSource, self).__init__(metadata=metadata) def _open_dataset(self): - self._kube = DataCube.from_xarray( - post_process_afm( - open_datacube( + self._kube = open_datacube( path=self.path, id_pattern=self.field_id, metadata_caching=self.metadata_caching, @@ -68,7 +65,5 @@ def _open_dataset(self): mapping=self.mapping, **self.xarray_kwargs, preprocess=self.preprocess - ).to_xarray() - ) - ) + ) return self._kube From 360a37e9e4e11d4a2cfcbc2a6476e2f5ee5ee145 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 10:28:53 +0200 Subject: [PATCH 20/53] moving reshape in post process --- drivers/intake_geokube/afm.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 44eeae0..b1a462c 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -12,7 +12,7 @@ from geokube.core.datacube import DataCube -def preprocess_afm(ds: xr.Dataset) -> xr.Dataset: +def postprocess_afm(ds: xr.Dataset) -> xr.Dataset: latitude = ds['lat'].values longitude = ds['lon'].values # ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0)) @@ -23,7 +23,8 @@ def preprocess_afm(ds: xr.Dataset) -> xr.Dataset: for dim in ds.dims: indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} deduplicated = deduplicated.isel(indexes) - return deduplicated + return deduplicated.sortby('time').chunk({'time': 50 , 'latitude': 50, 'longitude': 50}) + class CMCCAFMSource(GeokubeSource): name = "cmcc_afm_geokube" @@ -53,17 +54,21 @@ def __init__( self.mapping = mapping self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs self.load_files_on_persistance = load_files_on_persistance - self.preprocess = preprocess_afm + # self.preprocess = preprocess_afm super(CMCCAFMSource, self).__init__(metadata=metadata) def _open_dataset(self): - self._kube = open_datacube( + self._kube = DataCube.from_xarray( + postprocess_afm( + open_datacube( path=self.path, id_pattern=self.field_id, metadata_caching=self.metadata_caching, metadata_cache_path=self.metadata_cache_path, mapping=self.mapping, **self.xarray_kwargs, - preprocess=self.preprocess - ) + # preprocess=self.preprocess + ).to_xarray() + ) + ) return self._kube From 7eac0dec4b39b14830d9a654cd7cea106d12d774 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 12:35:18 +0200 Subject: [PATCH 21/53] changing chunks size --- drivers/intake_geokube/afm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index b1a462c..54fb1b0 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -23,7 +23,7 @@ def postprocess_afm(ds: xr.Dataset) -> xr.Dataset: for dim in ds.dims: indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} deduplicated = deduplicated.isel(indexes) - return deduplicated.sortby('time').chunk({'time': 50 , 'latitude': 50, 'longitude': 50}) + return deduplicated.sortby('time').chunk({'time': 24 , 'latitude': 24, 'longitude': 24}) class CMCCAFMSource(GeokubeSource): From 210c3f94a611a58d2580fad3a2a441d24124a1c9 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 14:46:40 +0200 Subject: [PATCH 22/53] adding post process chunk parameter --- drivers/intake_geokube/afm.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 54fb1b0..20e2a80 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -12,7 +12,7 @@ from geokube.core.datacube import DataCube -def postprocess_afm(ds: xr.Dataset) -> xr.Dataset: +def postprocess_afm(ds: xr.Dataset, post_process_chunks) -> xr.Dataset: latitude = ds['lat'].values longitude = ds['lon'].values # ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0)) @@ -23,7 +23,7 @@ def postprocess_afm(ds: xr.Dataset) -> xr.Dataset: for dim in ds.dims: indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} deduplicated = deduplicated.isel(indexes) - return deduplicated.sortby('time').chunk({'time': 24 , 'latitude': 24, 'longitude': 24}) + return deduplicated.sortby('time').chunk(post_process_chunks) class CMCCAFMSource(GeokubeSource): @@ -42,6 +42,7 @@ def __init__( metadata=None, mapping: Optional[Mapping[str, Mapping[str, str]]] = None, load_files_on_persistance: Optional[bool] = True, + postprocess_chunk: Optional = None ): self._kube = None self.path = path @@ -54,6 +55,7 @@ def __init__( self.mapping = mapping self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs self.load_files_on_persistance = load_files_on_persistance + self.postprocess_chunk = postprocess_chunk # self.preprocess = preprocess_afm super(CMCCAFMSource, self).__init__(metadata=metadata) @@ -68,7 +70,8 @@ def _open_dataset(self): mapping=self.mapping, **self.xarray_kwargs, # preprocess=self.preprocess - ).to_xarray() + ).to_xarray(), + self.postprocess_chunk ) ) return self._kube From 56fd4bba71744ed24b59c3d23ba9ced7a23c4066 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 15:44:46 +0200 Subject: [PATCH 23/53] adding crs projection --- drivers/intake_geokube/afm.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 20e2a80..536507c 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -11,6 +11,7 @@ from geokube.core.datacube import DataCube +_PROJECTION = {"grid_mapping_name": "latitude_longitude"} def postprocess_afm(ds: xr.Dataset, post_process_chunks) -> xr.Dataset: latitude = ds['lat'].values @@ -23,8 +24,16 @@ def postprocess_afm(ds: xr.Dataset, post_process_chunks) -> xr.Dataset: for dim in ds.dims: indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} deduplicated = deduplicated.isel(indexes) - return deduplicated.sortby('time').chunk(post_process_chunks) + return add_projection(deduplicated.sortby('time').chunk(post_process_chunks)) +def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset: + """Add projection information to the dataset""" + coords = dset.coords + coords["crs"] = xr.DataArray(data=np.array(1), attrs=_PROJECTION) + for var in dset.data_vars.values(): + enc = var.encoding + enc["grid_mapping"] = "crs" + return dset class CMCCAFMSource(GeokubeSource): name = "cmcc_afm_geokube" From 04cc64bfebb4d4c4b33fe14defaf8ccd0c68bab4 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 19 Jul 2024 16:01:01 +0200 Subject: [PATCH 24/53] fix post process function was using wrong dataset dimensions --- drivers/intake_geokube/afm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 536507c..08479bb 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -21,10 +21,10 @@ def postprocess_afm(ds: xr.Dataset, post_process_chunks) -> xr.Dataset: ds = ds.drop('lon') ds = ds.drop('certainty') deduplicated = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1, 0)) - for dim in ds.dims: + for dim in deduplicated.dims: indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} deduplicated = deduplicated.isel(indexes) - return add_projection(deduplicated.sortby('time').chunk(post_process_chunks)) + return add_projection(deduplicated.sortby('time').sortby('latitude').sortby('longitude').chunk(post_process_chunks)) def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset: """Add projection information to the dataset""" From 3fa9f271e1e4e6e4823170502996bbb54e4a7e37 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 11:20:55 +0200 Subject: [PATCH 25/53] setting threads_per_worker param to 1 --- executor/app/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/app/main.py b/executor/app/main.py index bf3f494..cd033be 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -266,6 +266,7 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): scheduler_port=dask_cluster_opts["scheduler_port"], dashboard_address=dask_cluster_opts["dashboard_address"], memory_limit=dask_cluster_opts["memory_limit"], + threads_per_worker=1 ) self._LOG.info( "creating Dask Client...", extra={"track_id": self._worker_id} @@ -451,6 +452,7 @@ def get_size(self, location_path): print("channel subscribe") for etype in executor_types: if etype == "query": + #TODO: create dask cluster with options executor.create_dask_cluster() executor.subscribe(etype) From 2e21cf2a38a3e590625b4858705b8d7aba5af0eb Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 11:40:58 +0200 Subject: [PATCH 26/53] upgrade dask version --- executor/requirements.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/executor/requirements.txt b/executor/requirements.txt index f188e90..77fd2b0 100644 --- a/executor/requirements.txt +++ b/executor/requirements.txt @@ -1,4 +1,6 @@ pika==1.2.1 prometheus_client sqlalchemy -pydantic \ No newline at end of file +pydantic +dask==2024.7.1 +dask[distributed]==2024.7.1 \ No newline at end of file From 5fa752e294cf6ef76584cf3e106236c046bc14b6 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 11:45:52 +0200 Subject: [PATCH 27/53] upgrade dask version --- executor/app/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/app/main.py b/executor/app/main.py index cd033be..d97c53b 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -265,7 +265,7 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): n_workers=dask_cluster_opts["n_workers"], scheduler_port=dask_cluster_opts["scheduler_port"], dashboard_address=dask_cluster_opts["dashboard_address"], - memory_limit=dask_cluster_opts["memory_limit"], + #memory_limit=dask_cluster_opts["memory_limit"], threads_per_worker=1 ) self._LOG.info( From 374ebb30e55cff81ef896e821b1aaa65223a3f8f Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 12:05:12 +0200 Subject: [PATCH 28/53] setting dask workers to 4 --- executor/app/main.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/executor/app/main.py b/executor/app/main.py index d97c53b..ecefd8e 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -249,7 +249,7 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): dask_cluster_opts["processes"] = True port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) dask_cluster_opts["dashboard_address"] = f":{port}" - dask_cluster_opts["n_workers"] = None + dask_cluster_opts["n_workers"] = 4 dask_cluster_opts["memory_limit"] = "auto" self._worker_id = self._db.create_worker( status="enabled", @@ -262,9 +262,9 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): extra={"track_id": self._worker_id}, ) dask_cluster = LocalCluster( - n_workers=dask_cluster_opts["n_workers"], - scheduler_port=dask_cluster_opts["scheduler_port"], - dashboard_address=dask_cluster_opts["dashboard_address"], + n_workers=dask_cluster_opts['n_workers'], + #scheduler_port=dask_cluster_opts["scheduler_port"], + #dashboard_address=dask_cluster_opts["dashboard_address"], #memory_limit=dask_cluster_opts["memory_limit"], threads_per_worker=1 ) @@ -272,7 +272,7 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): "creating Dask Client...", extra={"track_id": self._worker_id} ) self._dask_client = Client(dask_cluster) - self._nanny = Nanny(self._dask_client.cluster.scheduler.address) + #self._nanny = Nanny(self._dask_client.cluster.scheduler.address) def maybe_restart_cluster(self, status: RequestStatus): if status is RequestStatus.TIMEOUT: From 6ae159bf35f6ed5fc63b8c750dc4013cc7641bec Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 12:21:41 +0200 Subject: [PATCH 29/53] setting dask workers to 1 --- executor/app/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/app/main.py b/executor/app/main.py index ecefd8e..bd0ed61 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -249,7 +249,7 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): dask_cluster_opts["processes"] = True port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) dask_cluster_opts["dashboard_address"] = f":{port}" - dask_cluster_opts["n_workers"] = 4 + dask_cluster_opts["n_workers"] = 1 dask_cluster_opts["memory_limit"] = "auto" self._worker_id = self._db.create_worker( status="enabled", From 158fa476fa6ef8eb8d5eae5a2aea1cf04c98e459 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 12:46:03 +0200 Subject: [PATCH 30/53] removed dask cluster from executor --- executor/app/main.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/executor/app/main.py b/executor/app/main.py index bd0ed61..95fa375 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -195,7 +195,7 @@ def _persist_single_datacube(dataframe_item, base_path, format): return path -def process(message: Message, compute: bool): +async def process(message: Message, compute: bool): res_path = os.path.join(_BASE_DOWNLOAD_PATH, message.request_id) os.makedirs(res_path, exist_ok=True) match message.type: @@ -261,17 +261,17 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): dask_cluster_opts, extra={"track_id": self._worker_id}, ) - dask_cluster = LocalCluster( - n_workers=dask_cluster_opts['n_workers'], + #dask_cluster = LocalCluster( + # n_workers=dask_cluster_opts['n_workers'], #scheduler_port=dask_cluster_opts["scheduler_port"], #dashboard_address=dask_cluster_opts["dashboard_address"], #memory_limit=dask_cluster_opts["memory_limit"], - threads_per_worker=1 - ) + # threads_per_worker=1 + #) self._LOG.info( - "creating Dask Client...", extra={"track_id": self._worker_id} + "not creating Dask Client...", extra={"track_id": self._worker_id} ) - self._dask_client = Client(dask_cluster) + #self._dask_client = Client(dask_cluster) #self._nanny = Nanny(self._dask_client.cluster.scheduler.address) def maybe_restart_cluster(self, status: RequestStatus): @@ -357,7 +357,7 @@ def retry_until_timeout( ) status = RequestStatus.FAILED fail_reason = f"{type(e).__name__}: {str(e)}" - return (location_path, status, fail_reason) + return location_path, status, fail_reason def handle_message(self, connection, channel, delivery_tag, body): message: Message = Message(body) @@ -378,11 +378,14 @@ def handle_message(self, connection, channel, delivery_tag, body): "submitting job for workflow request", extra={"track_id": message.request_id}, ) - future = self._dask_client.submit( - process, - message=message, - compute=False, - ) + #future = self._dask_client.submit( + # process, + # message=message, + # compute=False, + #) + + future = asyncio.run(process(message,compute=False)) + location_path, status, fail_reason = self.retry_until_timeout( future, message=message, @@ -402,7 +405,7 @@ def handle_message(self, connection, channel, delivery_tag, body): cb = functools.partial(self.ack_message, channel, delivery_tag) connection.add_callback_threadsafe(cb) - self.maybe_restart_cluster(status) + #self.maybe_restart_cluster(status) self._LOG.debug( "request acknowledged", extra={"track_id": message.request_id} ) From e6aca539ee8a09e21f99289e89c8b41ab57a4816 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 15:20:10 +0200 Subject: [PATCH 31/53] checking with geokube version 0.2.6b2 --- drivers/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/Dockerfile b/drivers/Dockerfile index 4920f4e..96d15a7 100644 --- a/drivers/Dockerfile +++ b/drivers/Dockerfile @@ -1,6 +1,6 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geokube -# ARG TAG=v0.2.6b1 -ARG TAG=2024.05.03.10.36 +ARG TAG=v0.2.6b2 +#ARG TAG=2024.05.03.10.36 FROM $REGISTRY/geokube:$TAG COPY dist/intake_geokube-0.1a0-py3-none-any.whl / From 79463dd7f09880b37b1b8e09aa9b2b68c69d67bc Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 15:52:48 +0200 Subject: [PATCH 32/53] test with new geokube image and dask cluster --- drivers/Dockerfile | 4 ++-- executor/app/main.py | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/drivers/Dockerfile b/drivers/Dockerfile index 96d15a7..978eaa1 100644 --- a/drivers/Dockerfile +++ b/drivers/Dockerfile @@ -1,6 +1,6 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geokube -ARG TAG=v0.2.6b2 -#ARG TAG=2024.05.03.10.36 +#ARG TAG=v0.2.6b2 +ARG TAG=2024.05.03.10.36 FROM $REGISTRY/geokube:$TAG COPY dist/intake_geokube-0.1a0-py3-none-any.whl / diff --git a/executor/app/main.py b/executor/app/main.py index 95fa375..587c712 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -261,18 +261,18 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): dask_cluster_opts, extra={"track_id": self._worker_id}, ) - #dask_cluster = LocalCluster( - # n_workers=dask_cluster_opts['n_workers'], + dask_cluster = LocalCluster( + n_workers=dask_cluster_opts['n_workers'], #scheduler_port=dask_cluster_opts["scheduler_port"], #dashboard_address=dask_cluster_opts["dashboard_address"], #memory_limit=dask_cluster_opts["memory_limit"], - # threads_per_worker=1 - #) + threads_per_worker=1 + ) self._LOG.info( "not creating Dask Client...", extra={"track_id": self._worker_id} ) - #self._dask_client = Client(dask_cluster) - #self._nanny = Nanny(self._dask_client.cluster.scheduler.address) + self._dask_client = Client(dask_cluster) + self._nanny = Nanny(self._dask_client.cluster.scheduler.address) def maybe_restart_cluster(self, status: RequestStatus): if status is RequestStatus.TIMEOUT: @@ -378,13 +378,13 @@ def handle_message(self, connection, channel, delivery_tag, body): "submitting job for workflow request", extra={"track_id": message.request_id}, ) - #future = self._dask_client.submit( - # process, - # message=message, - # compute=False, - #) + future = self._dask_client.submit( + process, + message=message, + compute=False, + ) - future = asyncio.run(process(message,compute=False)) + #future = asyncio.run(process(message,compute=False)) location_path, status, fail_reason = self.retry_until_timeout( future, @@ -405,7 +405,7 @@ def handle_message(self, connection, channel, delivery_tag, body): cb = functools.partial(self.ack_message, channel, delivery_tag) connection.add_callback_threadsafe(cb) - #self.maybe_restart_cluster(status) + self.maybe_restart_cluster(status) self._LOG.debug( "request acknowledged", extra={"track_id": message.request_id} ) From 9405931564eeef77f7b8746bd4862f3263d06f94 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 16:02:30 +0200 Subject: [PATCH 33/53] removed async keyword from process function --- executor/app/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/app/main.py b/executor/app/main.py index 587c712..2728e01 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -195,7 +195,7 @@ def _persist_single_datacube(dataframe_item, base_path, format): return path -async def process(message: Message, compute: bool): +def process(message: Message, compute: bool): res_path = os.path.join(_BASE_DOWNLOAD_PATH, message.request_id) os.makedirs(res_path, exist_ok=True) match message.type: From 00bf6a356beb5f870c9aba05c3102f4acc649845 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 22 Jul 2024 16:15:49 +0200 Subject: [PATCH 34/53] adding certainty back in the driver --- drivers/intake_geokube/afm.py | 2 +- executor/requirements.txt | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 08479bb..29ca4fd 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -19,7 +19,7 @@ def postprocess_afm(ds: xr.Dataset, post_process_chunks) -> xr.Dataset: # ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0)) ds = ds.drop('lat') ds = ds.drop('lon') - ds = ds.drop('certainty') + #ds = ds.drop('certainty') deduplicated = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1, 0)) for dim in deduplicated.dims: indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} diff --git a/executor/requirements.txt b/executor/requirements.txt index 77fd2b0..f188e90 100644 --- a/executor/requirements.txt +++ b/executor/requirements.txt @@ -1,6 +1,4 @@ pika==1.2.1 prometheus_client sqlalchemy -pydantic -dask==2024.7.1 -dask[distributed]==2024.7.1 \ No newline at end of file +pydantic \ No newline at end of file From b5dea5cb839a630604817349a34cedc94e0fe6ee Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Thu, 25 Jul 2024 08:54:14 +0200 Subject: [PATCH 35/53] adding pattern handling for filters --- drivers/intake_geokube/afm.py | 43 ++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 29ca4fd..68a32d8 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -69,18 +69,35 @@ def __init__( super(CMCCAFMSource, self).__init__(metadata=metadata) def _open_dataset(self): - self._kube = DataCube.from_xarray( - postprocess_afm( - open_datacube( - path=self.path, - id_pattern=self.field_id, - metadata_caching=self.metadata_caching, - metadata_cache_path=self.metadata_cache_path, - mapping=self.mapping, - **self.xarray_kwargs, - # preprocess=self.preprocess - ).to_xarray(), - self.postprocess_chunk + if self.pattern is None: + self._kube = DataCube.from_xarray( + postprocess_afm( + open_datacube( + path=self.path, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + # preprocess=self.preprocess + ).to_xarray(), + self.postprocess_chunk + ) + ) + else: + self._kube = DataCube.from_xarray( + postprocess_afm( + open_dataset( + path=self.path, + pattern=self.pattern, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + # preprocess=self.preprocess + ).to_xarray(), + self.postprocess_chunk + ) ) - ) return self._kube From c30092574e64dd37b785e236c0e6cf9d6beea585 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Thu, 25 Jul 2024 10:50:12 +0200 Subject: [PATCH 36/53] resolving patter before using the open_datacube to get the list of files matching the pattern --- drivers/intake_geokube/afm.py | 36 +++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 68a32d8..ba3e700 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -1,14 +1,18 @@ """geokube driver for intake.""" +import glob import logging from functools import partial from typing import Any, Mapping, Optional, Union import numpy as np +import pandas as pd import xarray as xr +from geokube.backend.netcdf import FILES_COL from .base import GeokubeSource from geokube import open_datacube, open_dataset - +from intake.source.utils import reverse_format +from string import Formatter from geokube.core.datacube import DataCube _PROJECTION = {"grid_mapping_name": "latitude_longitude"} @@ -35,6 +39,27 @@ def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset: enc["grid_mapping"] = "crs" return dset +def _get_ds_attrs_names(pattern): + fmt = Formatter() + # get the dataset attrs from the pattern + ds_attr_names = [i[1] for i in fmt.parse(pattern) if i[1]] + return ds_attr_names + + +def _get_df_from_files_list(files, pattern, ds_attr_names): + l = [] + for f in files: + d = reverse_format(pattern, f) + d[FILES_COL] = f + l.append(d) + df = pd.DataFrame(l) + if len(l) == 0: + raise ValueError(f"No files found for the provided path!") + # unique index for each dataset attribute combos - we create a list of files + df = df.groupby(ds_attr_names)[FILES_COL].apply(list).reset_index() + df = df.set_index(ds_attr_names) + return df + class CMCCAFMSource(GeokubeSource): name = "cmcc_afm_geokube" @@ -85,11 +110,14 @@ def _open_dataset(self): ) ) else: + ds_attr_names = _get_ds_attrs_names(self.pattern) + files = glob.glob(self.path) # all files + df = _get_df_from_files_list(files, self.pattern, ds_attr_names) + self._kube = DataCube.from_xarray( postprocess_afm( - open_dataset( - path=self.path, - pattern=self.pattern, + open_datacube( + path=df[FILES_COL], id_pattern=self.field_id, metadata_caching=self.metadata_caching, metadata_cache_path=self.metadata_cache_path, From e4b1307899241a673c53526883f45811c92d5e2b Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Thu, 25 Jul 2024 11:02:25 +0200 Subject: [PATCH 37/53] fix path should be a list of files not a Series --- drivers/intake_geokube/afm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index ba3e700..6ac5ff3 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -117,7 +117,7 @@ def _open_dataset(self): self._kube = DataCube.from_xarray( postprocess_afm( open_datacube( - path=df[FILES_COL], + path=df[FILES_COL].to_list(), id_pattern=self.field_id, metadata_caching=self.metadata_caching, metadata_cache_path=self.metadata_cache_path, From 997eb1372a6464cf0f8fa886c228b437040d80a2 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Thu, 25 Jul 2024 11:36:10 +0200 Subject: [PATCH 38/53] using geokube.Dataset apply function to apply postprocess on all Datacubes --- drivers/intake_geokube/afm.py | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 6ac5ff3..4658bc9 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -3,7 +3,7 @@ import logging from functools import partial from typing import Any, Mapping, Optional, Union - +import geokube import numpy as np import pandas as pd import xarray as xr @@ -17,18 +17,22 @@ _PROJECTION = {"grid_mapping_name": "latitude_longitude"} -def postprocess_afm(ds: xr.Dataset, post_process_chunks) -> xr.Dataset: +def postprocess_afm(ds: xr.Dataset, **post_process_chunks): + if isinstance(ds, geokube.core.datacube.DataCube): + ds = ds.to_xarray() latitude = ds['lat'].values longitude = ds['lon'].values # ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0)) ds = ds.drop('lat') ds = ds.drop('lon') - #ds = ds.drop('certainty') + # ds = ds.drop('certainty') deduplicated = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1, 0)) + # print(deduplicated.dims) for dim in deduplicated.dims: indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} deduplicated = deduplicated.isel(indexes) - return add_projection(deduplicated.sortby('time').sortby('latitude').sortby('longitude').chunk(post_process_chunks)) + return DataCube.from_xarray( + deduplicated.sortby('time').sortby('latitude').sortby('longitude').chunk(post_process_chunks)) def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset: """Add projection information to the dataset""" @@ -95,7 +99,7 @@ def __init__( def _open_dataset(self): if self.pattern is None: - self._kube = DataCube.from_xarray( + self._kube =\ postprocess_afm( open_datacube( path=self.path, @@ -105,27 +109,18 @@ def _open_dataset(self): mapping=self.mapping, **self.xarray_kwargs, # preprocess=self.preprocess - ).to_xarray(), - self.postprocess_chunk + ), + **self.postprocess_chunk ) - ) else: - ds_attr_names = _get_ds_attrs_names(self.pattern) - files = glob.glob(self.path) # all files - df = _get_df_from_files_list(files, self.pattern, ds_attr_names) - - self._kube = DataCube.from_xarray( - postprocess_afm( - open_datacube( - path=df[FILES_COL].to_list(), + self._kube = open_dataset( + path=self.path, + pattern=self.pattern, id_pattern=self.field_id, metadata_caching=self.metadata_caching, metadata_cache_path=self.metadata_cache_path, mapping=self.mapping, **self.xarray_kwargs, # preprocess=self.preprocess - ).to_xarray(), - self.postprocess_chunk - ) - ) + ).apply(postprocess_afm,**self.postprocess_chunk) return self._kube From 81b86500e2eee65f27ff25b8e53cdc210ae0b2b6 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Thu, 25 Jul 2024 15:29:34 +0200 Subject: [PATCH 39/53] removed unused functions --- drivers/intake_geokube/afm.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 4658bc9..8e22232 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -43,26 +43,6 @@ def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset: enc["grid_mapping"] = "crs" return dset -def _get_ds_attrs_names(pattern): - fmt = Formatter() - # get the dataset attrs from the pattern - ds_attr_names = [i[1] for i in fmt.parse(pattern) if i[1]] - return ds_attr_names - - -def _get_df_from_files_list(files, pattern, ds_attr_names): - l = [] - for f in files: - d = reverse_format(pattern, f) - d[FILES_COL] = f - l.append(d) - df = pd.DataFrame(l) - if len(l) == 0: - raise ValueError(f"No files found for the provided path!") - # unique index for each dataset attribute combos - we create a list of files - df = df.groupby(ds_attr_names)[FILES_COL].apply(list).reset_index() - df = df.set_index(ds_attr_names) - return df class CMCCAFMSource(GeokubeSource): name = "cmcc_afm_geokube" From 0708a7200296197003d6cdb1d9658b5391022b79 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Thu, 25 Jul 2024 15:29:48 +0200 Subject: [PATCH 40/53] setting worker number to 4 --- executor/app/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/app/main.py b/executor/app/main.py index 2728e01..5282fd9 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -249,7 +249,7 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): dask_cluster_opts["processes"] = True port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) dask_cluster_opts["dashboard_address"] = f":{port}" - dask_cluster_opts["n_workers"] = 1 + dask_cluster_opts["n_workers"] = 4 dask_cluster_opts["memory_limit"] = "auto" self._worker_id = self._db.create_worker( status="enabled", From 9f77ab2e81266cda0eacc2b1a6e2a5e376119679 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 26 Jul 2024 11:19:11 +0200 Subject: [PATCH 41/53] applying resampling to datacubes --- drivers/intake_geokube/afm.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 8e22232..62cdc21 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -1,18 +1,11 @@ """geokube driver for intake.""" -import glob -import logging -from functools import partial -from typing import Any, Mapping, Optional, Union + +from typing import Mapping, Optional import geokube import numpy as np -import pandas as pd import xarray as xr -from geokube.backend.netcdf import FILES_COL - from .base import GeokubeSource from geokube import open_datacube, open_dataset -from intake.source.utils import reverse_format -from string import Formatter from geokube.core.datacube import DataCube _PROJECTION = {"grid_mapping_name": "latitude_longitude"} @@ -25,7 +18,7 @@ def postprocess_afm(ds: xr.Dataset, **post_process_chunks): # ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0)) ds = ds.drop('lat') ds = ds.drop('lon') - # ds = ds.drop('certainty') + ds = ds.drop('certainty') deduplicated = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1, 0)) # print(deduplicated.dims) for dim in deduplicated.dims: @@ -91,7 +84,7 @@ def _open_dataset(self): # preprocess=self.preprocess ), **self.postprocess_chunk - ) + ).resample('maximum', frequency='1H') else: self._kube = open_dataset( path=self.path, @@ -102,5 +95,5 @@ def _open_dataset(self): mapping=self.mapping, **self.xarray_kwargs, # preprocess=self.preprocess - ).apply(postprocess_afm,**self.postprocess_chunk) + ).apply(postprocess_afm,**self.postprocess_chunk).resample('maximum', frequency='1H') return self._kube From 9876f1d1c7386d0df44e67cb6d8157b92fb7ff12 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 26 Jul 2024 11:37:17 +0200 Subject: [PATCH 42/53] removed chunk in postprocess --- drivers/intake_geokube/afm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index 62cdc21..c02479e 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -25,7 +25,7 @@ def postprocess_afm(ds: xr.Dataset, **post_process_chunks): indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} deduplicated = deduplicated.isel(indexes) return DataCube.from_xarray( - deduplicated.sortby('time').sortby('latitude').sortby('longitude').chunk(post_process_chunks)) + deduplicated.sortby('time').sortby('latitude').sortby('longitude')) def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset: """Add projection information to the dataset""" From 7cda4fdf17d614201724c9d3c5cb7976d71c4d35 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 26 Jul 2024 14:34:51 +0200 Subject: [PATCH 43/53] applying chunking again --- drivers/intake_geokube/afm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py index c02479e..62cdc21 100644 --- a/drivers/intake_geokube/afm.py +++ b/drivers/intake_geokube/afm.py @@ -25,7 +25,7 @@ def postprocess_afm(ds: xr.Dataset, **post_process_chunks): indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} deduplicated = deduplicated.isel(indexes) return DataCube.from_xarray( - deduplicated.sortby('time').sortby('latitude').sortby('longitude')) + deduplicated.sortby('time').sortby('latitude').sortby('longitude').chunk(post_process_chunks)) def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset: """Add projection information to the dataset""" From 533fdff281643c206e7dbf5b5084cb4f98995933 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Fri, 26 Jul 2024 14:35:19 +0200 Subject: [PATCH 44/53] setting number of thread per worker to 8 --- executor/app/main.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/executor/app/main.py b/executor/app/main.py index 5282fd9..e0dce23 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -249,8 +249,9 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): dask_cluster_opts["processes"] = True port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) dask_cluster_opts["dashboard_address"] = f":{port}" - dask_cluster_opts["n_workers"] = 4 + dask_cluster_opts["n_workers"] = 1 dask_cluster_opts["memory_limit"] = "auto" + dask_cluster_opts['thread_per_worker'] = 8 self._worker_id = self._db.create_worker( status="enabled", dask_scheduler_port=dask_cluster_opts["scheduler_port"], @@ -266,7 +267,7 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): #scheduler_port=dask_cluster_opts["scheduler_port"], #dashboard_address=dask_cluster_opts["dashboard_address"], #memory_limit=dask_cluster_opts["memory_limit"], - threads_per_worker=1 + threads_per_worker=dask_cluster_opts['thread_per_worker'], ) self._LOG.info( "not creating Dask Client...", extra={"track_id": self._worker_id} From 991f9cfda3afa39177e36cacf63e50b9518c91ce Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 29 Jul 2024 10:33:41 +0200 Subject: [PATCH 45/53] adding options for Dask cluster --- executor/app/main.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/executor/app/main.py b/executor/app/main.py index e0dce23..508ee62 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -231,7 +231,7 @@ def process(message: Message, compute: bool): class Executor(metaclass=LoggableMeta): _LOG = logging.getLogger("geokube.Executor") - def __init__(self, broker, store_path): + def __init__(self, broker, store_path, dask_cluster_opts): self._store = store_path broker_conn = pika.BlockingConnection( pika.ConnectionParameters(host=broker, heartbeat=10), @@ -239,19 +239,12 @@ def __init__(self, broker, store_path): self._conn = broker_conn self._channel = broker_conn.channel() self._db = DBManager() + self.dask_cluster_opts = dask_cluster_opts def create_dask_cluster(self, dask_cluster_opts: dict = None): if dask_cluster_opts is None: - dask_cluster_opts = {} - dask_cluster_opts["scheduler_port"] = int( - os.getenv("DASK_SCHEDULER_PORT", 8188) - ) - dask_cluster_opts["processes"] = True - port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) - dask_cluster_opts["dashboard_address"] = f":{port}" - dask_cluster_opts["n_workers"] = 1 - dask_cluster_opts["memory_limit"] = "auto" - dask_cluster_opts['thread_per_worker'] = 8 + dask_cluster_opts = self.dask_cluster_opts + self._worker_id = self._db.create_worker( status="enabled", dask_scheduler_port=dask_cluster_opts["scheduler_port"], @@ -264,13 +257,13 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): ) dask_cluster = LocalCluster( n_workers=dask_cluster_opts['n_workers'], - #scheduler_port=dask_cluster_opts["scheduler_port"], - #dashboard_address=dask_cluster_opts["dashboard_address"], - #memory_limit=dask_cluster_opts["memory_limit"], + scheduler_port=dask_cluster_opts["scheduler_port"], + dashboard_address=dask_cluster_opts["dashboard_address"], + memory_limit=dask_cluster_opts["memory_limit"], threads_per_worker=dask_cluster_opts['thread_per_worker'], ) self._LOG.info( - "not creating Dask Client...", extra={"track_id": self._worker_id} + "creating Dask Client...", extra={"track_id": self._worker_id} ) self._dask_client = Client(dask_cluster) self._nanny = Nanny(self._dask_client.cluster.scheduler.address) @@ -452,11 +445,22 @@ def get_size(self, location_path): executor_types = os.getenv("EXECUTOR_TYPES", "query").split(",") store_path = os.getenv("STORE_PATH", ".") - executor = Executor(broker=broker, store_path=store_path) + dask_cluster_opts = {} + dask_cluster_opts["scheduler_port"] = int( + os.getenv("DASK_SCHEDULER_PORT", 8188) + ) + dask_cluster_opts["processes"] = True + port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) + dask_cluster_opts["dashboard_address"] = f":{port}" + dask_cluster_opts["n_workers"] = os.getenv("DASK_N_WORKERS", 1) + dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto") + dask_cluster_opts['thread_per_worker'] = os.getenv("DASK_THREADS_PER_WORKER", 8) + + + executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts) print("channel subscribe") for etype in executor_types: if etype == "query": - #TODO: create dask cluster with options executor.create_dask_cluster() executor.subscribe(etype) From 15dbb9cb0cd74be4fa28e380b9291bda1d3d4b32 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Mon, 29 Jul 2024 10:45:35 +0200 Subject: [PATCH 46/53] n_workers and thread per worker should be integer --- executor/app/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/app/main.py b/executor/app/main.py index 508ee62..5daf7e6 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -452,9 +452,9 @@ def get_size(self, location_path): dask_cluster_opts["processes"] = True port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) dask_cluster_opts["dashboard_address"] = f":{port}" - dask_cluster_opts["n_workers"] = os.getenv("DASK_N_WORKERS", 1) + dask_cluster_opts["n_workers"] = int(os.getenv("DASK_N_WORKERS", 1)) dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto") - dask_cluster_opts['thread_per_worker'] = os.getenv("DASK_THREADS_PER_WORKER", 8) + dask_cluster_opts['thread_per_worker'] = int(os.getenv("DASK_THREADS_PER_WORKER", 8)) executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts) From 0e129e9f6af95bc656a4e191ddb2e2b7358c9966 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Tue, 30 Jul 2024 11:16:44 +0200 Subject: [PATCH 47/53] Adding healtchecks for API pods via cron --- api/Dockerfile | 10 ++++++++++ api/healtcheck.cron | 1 + api/healtcheck.sh | 1 + 3 files changed, 12 insertions(+) create mode 100644 api/healtcheck.cron create mode 100644 api/healtcheck.sh diff --git a/api/Dockerfile b/api/Dockerfile index c038cb3..310e7cc 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -1,9 +1,19 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geolake ARG TAG=latest FROM $REGISTRY/geolake-datastore:$TAG + +RUN apt update && apt install -y cron + WORKDIR /app COPY requirements.txt /code/requirements.txt RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt COPY app /app EXPOSE 80 + +COPY ./healtcheck.* /opt/ + +RUN chmod +x /opt/healtcheck.sh +RUN crontab -u root /opt/healtcheck.cron +RUN service cron start + CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] diff --git a/api/healtcheck.cron b/api/healtcheck.cron new file mode 100644 index 0000000..e11f6da --- /dev/null +++ b/api/healtcheck.cron @@ -0,0 +1 @@ +*/10 * * * * bash -c '/opt/healtcheck.sh' \ No newline at end of file diff --git a/api/healtcheck.sh b/api/healtcheck.sh new file mode 100644 index 0000000..4d8e479 --- /dev/null +++ b/api/healtcheck.sh @@ -0,0 +1 @@ +curl https://hc-ping.com/$HEALTCHECKS \ No newline at end of file From 626ecd5a196c92fca19c89c6faf2b4465d68f181 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Tue, 30 Jul 2024 11:23:36 +0200 Subject: [PATCH 48/53] missing new line at EOF --- api/healtcheck.cron | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/healtcheck.cron b/api/healtcheck.cron index e11f6da..e36bf23 100644 --- a/api/healtcheck.cron +++ b/api/healtcheck.cron @@ -1 +1 @@ -*/10 * * * * bash -c '/opt/healtcheck.sh' \ No newline at end of file +*/10 * * * * bash -c '/opt/healtcheck.sh' From a55c25afa8e7ca095483a34a67177e3c8890b941 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Tue, 30 Jul 2024 11:59:06 +0200 Subject: [PATCH 49/53] launching service cron at API start --- api/Dockerfile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api/Dockerfile b/api/Dockerfile index 310e7cc..cf1ccc2 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -16,4 +16,6 @@ RUN chmod +x /opt/healtcheck.sh RUN crontab -u root /opt/healtcheck.cron RUN service cron start -CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] +#CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] + +ENTRYPOINT uvicorn app.main:app --proxy-headers --host 0.0.0.0 --port 80 && service cron start \ No newline at end of file From a8b045e0e4c162a8768a2c972add40c4faf27e4f Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Tue, 30 Jul 2024 12:10:59 +0200 Subject: [PATCH 50/53] starting service first --- api/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/Dockerfile b/api/Dockerfile index cf1ccc2..e3b399d 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -18,4 +18,4 @@ RUN service cron start #CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] -ENTRYPOINT uvicorn app.main:app --proxy-headers --host 0.0.0.0 --port 80 && service cron start \ No newline at end of file +ENTRYPOINT service cron start && uvicorn app.main:app --proxy-headers --host 0.0.0.0 --port 80 \ No newline at end of file From e17dfef2aa09ba13d4cd5399a74c0da32d817b82 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Tue, 30 Jul 2024 12:38:00 +0200 Subject: [PATCH 51/53] modifying entrypoint --- api/Dockerfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api/Dockerfile b/api/Dockerfile index e3b399d..648554d 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -14,8 +14,7 @@ COPY ./healtcheck.* /opt/ RUN chmod +x /opt/healtcheck.sh RUN crontab -u root /opt/healtcheck.cron -RUN service cron start #CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] -ENTRYPOINT service cron start && uvicorn app.main:app --proxy-headers --host 0.0.0.0 --port 80 \ No newline at end of file +ENTRYPOINT service cron start && ./../wait-for-it.sh $(BROKER_SERVICE_HOST):5672 -- uvicorn main:app --host 0.0.0.0 --port '80' \ No newline at end of file From 38fe6dfd8df3b270123cb00fea73d60d36f61b77 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Tue, 30 Jul 2024 16:21:23 +0200 Subject: [PATCH 52/53] missing curl in api image --- api/Dockerfile | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/api/Dockerfile b/api/Dockerfile index 648554d..ad73842 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -2,7 +2,7 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geolake ARG TAG=latest FROM $REGISTRY/geolake-datastore:$TAG -RUN apt update && apt install -y cron +RUN apt update && apt install -y cron curl WORKDIR /app COPY requirements.txt /code/requirements.txt @@ -15,6 +15,5 @@ COPY ./healtcheck.* /opt/ RUN chmod +x /opt/healtcheck.sh RUN crontab -u root /opt/healtcheck.cron -#CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] +CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] -ENTRYPOINT service cron start && ./../wait-for-it.sh $(BROKER_SERVICE_HOST):5672 -- uvicorn main:app --host 0.0.0.0 --port '80' \ No newline at end of file From 67e6c9bb0049f799da1380b5be3153626c1fa3b2 Mon Sep 17 00:00:00 2001 From: Gabriele Tramonte Date: Wed, 31 Jul 2024 09:31:55 +0200 Subject: [PATCH 53/53] adding healtchek to executors --- executor/Dockerfile | 10 ++++++++++ executor/healtcheck.cron | 1 + executor/healtcheck.sh | 1 + 3 files changed, 12 insertions(+) create mode 100644 executor/healtcheck.cron create mode 100644 executor/healtcheck.sh diff --git a/executor/Dockerfile b/executor/Dockerfile index 3888c93..e3404e5 100644 --- a/executor/Dockerfile +++ b/executor/Dockerfile @@ -1,8 +1,18 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geolake ARG TAG=latest FROM $REGISTRY/geolake-datastore:$TAG + +RUN apt update && apt install -y cron curl + + WORKDIR /app COPY requirements.txt /code/requirements.txt RUN pip install --no-cache-dir -r /code/requirements.txt COPY app /app + +COPY ./healtcheck.* /opt/ + +RUN chmod +x /opt/healtcheck.sh +RUN crontab -u root /opt/healtcheck.cron + CMD [ "python", "main.py" ] diff --git a/executor/healtcheck.cron b/executor/healtcheck.cron new file mode 100644 index 0000000..e36bf23 --- /dev/null +++ b/executor/healtcheck.cron @@ -0,0 +1 @@ +*/10 * * * * bash -c '/opt/healtcheck.sh' diff --git a/executor/healtcheck.sh b/executor/healtcheck.sh new file mode 100644 index 0000000..4d8e479 --- /dev/null +++ b/executor/healtcheck.sh @@ -0,0 +1 @@ +curl https://hc-ping.com/$HEALTCHECKS \ No newline at end of file