diff --git a/.gitignore b/.gitignore index 2c15e0a..1856070 100644 --- a/.gitignore +++ b/.gitignore @@ -112,3 +112,4 @@ venv.bak/ _catalogs/ _old/ +.DS_Store \ No newline at end of file diff --git a/api/Dockerfile b/api/Dockerfile index c038cb3..ad73842 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -1,9 +1,19 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geolake ARG TAG=latest FROM $REGISTRY/geolake-datastore:$TAG + +RUN apt update && apt install -y cron curl + WORKDIR /app COPY requirements.txt /code/requirements.txt RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt COPY app /app EXPOSE 80 + +COPY ./healtcheck.* /opt/ + +RUN chmod +x /opt/healtcheck.sh +RUN crontab -u root /opt/healtcheck.cron + CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] + diff --git a/api/healtcheck.cron b/api/healtcheck.cron new file mode 100644 index 0000000..e36bf23 --- /dev/null +++ b/api/healtcheck.cron @@ -0,0 +1 @@ +*/10 * * * * bash -c '/opt/healtcheck.sh' diff --git a/api/healtcheck.sh b/api/healtcheck.sh new file mode 100644 index 0000000..4d8e479 --- /dev/null +++ b/api/healtcheck.sh @@ -0,0 +1 @@ +curl https://hc-ping.com/$HEALTCHECKS \ No newline at end of file diff --git a/datastore/datastore/datastore.py b/datastore/datastore/datastore.py index 2525c02..b754fc0 100644 --- a/datastore/datastore/datastore.py +++ b/datastore/datastore/datastore.py @@ -50,7 +50,7 @@ def __init__(self) -> None: @log_execution_time(_LOG) def get_cached_product_or_read( - self, dataset_id: str, product_id: str, query: GeoQuery | None = None + self, dataset_id: str, product_id: str, ) -> DataCube | Dataset: """Get product from the cache instead of loading files indicated in the catalog if `metadata_caching` set to `True`. @@ -81,7 +81,7 @@ def get_cached_product_or_read( ) return self.catalog(CACHE_DIR=self.cache_dir)[dataset_id][ product_id - ].get(geoquery=query, compute=False).read_chunked() + ].read_chunked() return self.cache[dataset_id][product_id] @log_execution_time(_LOG) @@ -389,7 +389,7 @@ def estimate( # NOTE: we always use catalog directly and single product cache self._LOG.debug("loading product...") # NOTE: for estimation we use cached products - kube = self.get_cached_product_or_read(dataset_id, product_id, query=query) + kube = self.get_cached_product_or_read(dataset_id, product_id) self._LOG.debug("original kube len: %s", len(kube)) return Datastore._process_query(kube, geoquery, False).nbytes diff --git a/drivers/Dockerfile b/drivers/Dockerfile index d94bd0c..978eaa1 100644 --- a/drivers/Dockerfile +++ b/drivers/Dockerfile @@ -1,6 +1,6 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geokube -# ARG TAG=v0.2.6b1 -ARG TAG=latest +#ARG TAG=v0.2.6b2 +ARG TAG=2024.05.03.10.36 FROM $REGISTRY/geokube:$TAG COPY dist/intake_geokube-0.1a0-py3-none-any.whl / diff --git a/drivers/intake_geokube/afm.py b/drivers/intake_geokube/afm.py new file mode 100644 index 0000000..62cdc21 --- /dev/null +++ b/drivers/intake_geokube/afm.py @@ -0,0 +1,99 @@ +"""geokube driver for intake.""" + +from typing import Mapping, Optional +import geokube +import numpy as np +import xarray as xr +from .base import GeokubeSource +from geokube import open_datacube, open_dataset +from geokube.core.datacube import DataCube + +_PROJECTION = {"grid_mapping_name": "latitude_longitude"} + +def postprocess_afm(ds: xr.Dataset, **post_process_chunks): + if isinstance(ds, geokube.core.datacube.DataCube): + ds = ds.to_xarray() + latitude = ds['lat'].values + longitude = ds['lon'].values + # ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0)) + ds = ds.drop('lat') + ds = ds.drop('lon') + ds = ds.drop('certainty') + deduplicated = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1, 0)) + # print(deduplicated.dims) + for dim in deduplicated.dims: + indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')} + deduplicated = deduplicated.isel(indexes) + return DataCube.from_xarray( + deduplicated.sortby('time').sortby('latitude').sortby('longitude').chunk(post_process_chunks)) + +def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset: + """Add projection information to the dataset""" + coords = dset.coords + coords["crs"] = xr.DataArray(data=np.array(1), attrs=_PROJECTION) + for var in dset.data_vars.values(): + enc = var.encoding + enc["grid_mapping"] = "crs" + return dset + + +class CMCCAFMSource(GeokubeSource): + name = "cmcc_afm_geokube" + + def __init__( + self, + path: str, + pattern: str = None, + field_id: str = None, + delay_read_cubes: bool = False, + metadata_caching: bool = False, + metadata_cache_path: str = None, + storage_options: dict = None, + xarray_kwargs: dict = None, + metadata=None, + mapping: Optional[Mapping[str, Mapping[str, str]]] = None, + load_files_on_persistance: Optional[bool] = True, + postprocess_chunk: Optional = None + ): + self._kube = None + self.path = path + self.pattern = pattern + self.field_id = field_id + self.delay_read_cubes = delay_read_cubes + self.metadata_caching = metadata_caching + self.metadata_cache_path = metadata_cache_path + self.storage_options = storage_options + self.mapping = mapping + self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs + self.load_files_on_persistance = load_files_on_persistance + self.postprocess_chunk = postprocess_chunk + # self.preprocess = preprocess_afm + super(CMCCAFMSource, self).__init__(metadata=metadata) + + def _open_dataset(self): + if self.pattern is None: + self._kube =\ + postprocess_afm( + open_datacube( + path=self.path, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + # preprocess=self.preprocess + ), + **self.postprocess_chunk + ).resample('maximum', frequency='1H') + else: + self._kube = open_dataset( + path=self.path, + pattern=self.pattern, + id_pattern=self.field_id, + metadata_caching=self.metadata_caching, + metadata_cache_path=self.metadata_cache_path, + mapping=self.mapping, + **self.xarray_kwargs, + # preprocess=self.preprocess + ).apply(postprocess_afm,**self.postprocess_chunk).resample('maximum', frequency='1H') + return self._kube diff --git a/drivers/setup.py b/drivers/setup.py index c99c224..b3a3032 100644 --- a/drivers/setup.py +++ b/drivers/setup.py @@ -18,6 +18,7 @@ "intake.drivers": [ "geokube_netcdf = intake_geokube.netcdf:NetCDFSource", "cmcc_wrf_geokube = intake_geokube.wrf:CMCCWRFSource", + "cmcc_afm_geokube = intake_geokube.afm:CMCCAFMSource", ] }, classifiers=[ diff --git a/executor/Dockerfile b/executor/Dockerfile index 3888c93..e3404e5 100644 --- a/executor/Dockerfile +++ b/executor/Dockerfile @@ -1,8 +1,18 @@ ARG REGISTRY=rg.fr-par.scw.cloud/geolake ARG TAG=latest FROM $REGISTRY/geolake-datastore:$TAG + +RUN apt update && apt install -y cron curl + + WORKDIR /app COPY requirements.txt /code/requirements.txt RUN pip install --no-cache-dir -r /code/requirements.txt COPY app /app + +COPY ./healtcheck.* /opt/ + +RUN chmod +x /opt/healtcheck.sh +RUN crontab -u root /opt/healtcheck.cron + CMD [ "python", "main.py" ] diff --git a/executor/app/main.py b/executor/app/main.py index bf3f494..5daf7e6 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -231,7 +231,7 @@ def process(message: Message, compute: bool): class Executor(metaclass=LoggableMeta): _LOG = logging.getLogger("geokube.Executor") - def __init__(self, broker, store_path): + def __init__(self, broker, store_path, dask_cluster_opts): self._store = store_path broker_conn = pika.BlockingConnection( pika.ConnectionParameters(host=broker, heartbeat=10), @@ -239,18 +239,12 @@ def __init__(self, broker, store_path): self._conn = broker_conn self._channel = broker_conn.channel() self._db = DBManager() + self.dask_cluster_opts = dask_cluster_opts def create_dask_cluster(self, dask_cluster_opts: dict = None): if dask_cluster_opts is None: - dask_cluster_opts = {} - dask_cluster_opts["scheduler_port"] = int( - os.getenv("DASK_SCHEDULER_PORT", 8188) - ) - dask_cluster_opts["processes"] = True - port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) - dask_cluster_opts["dashboard_address"] = f":{port}" - dask_cluster_opts["n_workers"] = None - dask_cluster_opts["memory_limit"] = "auto" + dask_cluster_opts = self.dask_cluster_opts + self._worker_id = self._db.create_worker( status="enabled", dask_scheduler_port=dask_cluster_opts["scheduler_port"], @@ -262,10 +256,11 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): extra={"track_id": self._worker_id}, ) dask_cluster = LocalCluster( - n_workers=dask_cluster_opts["n_workers"], + n_workers=dask_cluster_opts['n_workers'], scheduler_port=dask_cluster_opts["scheduler_port"], dashboard_address=dask_cluster_opts["dashboard_address"], memory_limit=dask_cluster_opts["memory_limit"], + threads_per_worker=dask_cluster_opts['thread_per_worker'], ) self._LOG.info( "creating Dask Client...", extra={"track_id": self._worker_id} @@ -356,7 +351,7 @@ def retry_until_timeout( ) status = RequestStatus.FAILED fail_reason = f"{type(e).__name__}: {str(e)}" - return (location_path, status, fail_reason) + return location_path, status, fail_reason def handle_message(self, connection, channel, delivery_tag, body): message: Message = Message(body) @@ -382,6 +377,9 @@ def handle_message(self, connection, channel, delivery_tag, body): message=message, compute=False, ) + + #future = asyncio.run(process(message,compute=False)) + location_path, status, fail_reason = self.retry_until_timeout( future, message=message, @@ -447,7 +445,19 @@ def get_size(self, location_path): executor_types = os.getenv("EXECUTOR_TYPES", "query").split(",") store_path = os.getenv("STORE_PATH", ".") - executor = Executor(broker=broker, store_path=store_path) + dask_cluster_opts = {} + dask_cluster_opts["scheduler_port"] = int( + os.getenv("DASK_SCHEDULER_PORT", 8188) + ) + dask_cluster_opts["processes"] = True + port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) + dask_cluster_opts["dashboard_address"] = f":{port}" + dask_cluster_opts["n_workers"] = int(os.getenv("DASK_N_WORKERS", 1)) + dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto") + dask_cluster_opts['thread_per_worker'] = int(os.getenv("DASK_THREADS_PER_WORKER", 8)) + + + executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts) print("channel subscribe") for etype in executor_types: if etype == "query": diff --git a/executor/healtcheck.cron b/executor/healtcheck.cron new file mode 100644 index 0000000..e36bf23 --- /dev/null +++ b/executor/healtcheck.cron @@ -0,0 +1 @@ +*/10 * * * * bash -c '/opt/healtcheck.sh' diff --git a/executor/healtcheck.sh b/executor/healtcheck.sh new file mode 100644 index 0000000..4d8e479 --- /dev/null +++ b/executor/healtcheck.sh @@ -0,0 +1 @@ +curl https://hc-ping.com/$HEALTCHECKS \ No newline at end of file