Skip to content

Commit

Permalink
Merge pull request #36 from CMCC-Foundation/hotfix_v0.1a1
Browse files Browse the repository at this point in the history
Hotfix v0.1a1
  • Loading branch information
gtramonte authored Aug 5, 2024
2 parents 5eef173 + 67e6c9b commit a06f748
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,4 @@ venv.bak/
_catalogs/
_old/

.DS_Store
10 changes: 10 additions & 0 deletions api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
ARG REGISTRY=rg.fr-par.scw.cloud/geolake
ARG TAG=latest
FROM $REGISTRY/geolake-datastore:$TAG

RUN apt update && apt install -y cron curl

WORKDIR /app
COPY requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
COPY app /app
EXPOSE 80

COPY ./healtcheck.* /opt/

RUN chmod +x /opt/healtcheck.sh
RUN crontab -u root /opt/healtcheck.cron

CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"]

1 change: 1 addition & 0 deletions api/healtcheck.cron
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*/10 * * * * bash -c '/opt/healtcheck.sh'
1 change: 1 addition & 0 deletions api/healtcheck.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
curl https://hc-ping.com/$HEALTCHECKS
6 changes: 3 additions & 3 deletions datastore/datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self) -> None:

@log_execution_time(_LOG)
def get_cached_product_or_read(
self, dataset_id: str, product_id: str, query: GeoQuery | None = None
self, dataset_id: str, product_id: str,
) -> DataCube | Dataset:
"""Get product from the cache instead of loading files indicated in
the catalog if `metadata_caching` set to `True`.
Expand Down Expand Up @@ -81,7 +81,7 @@ def get_cached_product_or_read(
)
return self.catalog(CACHE_DIR=self.cache_dir)[dataset_id][
product_id
].get(geoquery=query, compute=False).read_chunked()
].read_chunked()
return self.cache[dataset_id][product_id]

@log_execution_time(_LOG)
Expand Down Expand Up @@ -389,7 +389,7 @@ def estimate(
# NOTE: we always use catalog directly and single product cache
self._LOG.debug("loading product...")
# NOTE: for estimation we use cached products
kube = self.get_cached_product_or_read(dataset_id, product_id, query=query)
kube = self.get_cached_product_or_read(dataset_id, product_id)
self._LOG.debug("original kube len: %s", len(kube))
return Datastore._process_query(kube, geoquery, False).nbytes

Expand Down
4 changes: 2 additions & 2 deletions drivers/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG REGISTRY=rg.fr-par.scw.cloud/geokube
# ARG TAG=v0.2.6b1
ARG TAG=latest
#ARG TAG=v0.2.6b2
ARG TAG=2024.05.03.10.36
FROM $REGISTRY/geokube:$TAG

COPY dist/intake_geokube-0.1a0-py3-none-any.whl /
Expand Down
99 changes: 99 additions & 0 deletions drivers/intake_geokube/afm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""geokube driver for intake."""

from typing import Mapping, Optional
import geokube
import numpy as np
import xarray as xr
from .base import GeokubeSource
from geokube import open_datacube, open_dataset
from geokube.core.datacube import DataCube

_PROJECTION = {"grid_mapping_name": "latitude_longitude"}

def postprocess_afm(ds: xr.Dataset, **post_process_chunks):
if isinstance(ds, geokube.core.datacube.DataCube):
ds = ds.to_xarray()
latitude = ds['lat'].values
longitude = ds['lon'].values
# ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0))
ds = ds.drop('lat')
ds = ds.drop('lon')
ds = ds.drop('certainty')
deduplicated = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1, 0))
# print(deduplicated.dims)
for dim in deduplicated.dims:
indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')}
deduplicated = deduplicated.isel(indexes)
return DataCube.from_xarray(
deduplicated.sortby('time').sortby('latitude').sortby('longitude').chunk(post_process_chunks))

def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset:
"""Add projection information to the dataset"""
coords = dset.coords
coords["crs"] = xr.DataArray(data=np.array(1), attrs=_PROJECTION)
for var in dset.data_vars.values():
enc = var.encoding
enc["grid_mapping"] = "crs"
return dset


class CMCCAFMSource(GeokubeSource):
name = "cmcc_afm_geokube"

def __init__(
self,
path: str,
pattern: str = None,
field_id: str = None,
delay_read_cubes: bool = False,
metadata_caching: bool = False,
metadata_cache_path: str = None,
storage_options: dict = None,
xarray_kwargs: dict = None,
metadata=None,
mapping: Optional[Mapping[str, Mapping[str, str]]] = None,
load_files_on_persistance: Optional[bool] = True,
postprocess_chunk: Optional = None
):
self._kube = None
self.path = path
self.pattern = pattern
self.field_id = field_id
self.delay_read_cubes = delay_read_cubes
self.metadata_caching = metadata_caching
self.metadata_cache_path = metadata_cache_path
self.storage_options = storage_options
self.mapping = mapping
self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs
self.load_files_on_persistance = load_files_on_persistance
self.postprocess_chunk = postprocess_chunk
# self.preprocess = preprocess_afm
super(CMCCAFMSource, self).__init__(metadata=metadata)

def _open_dataset(self):
if self.pattern is None:
self._kube =\
postprocess_afm(
open_datacube(
path=self.path,
id_pattern=self.field_id,
metadata_caching=self.metadata_caching,
metadata_cache_path=self.metadata_cache_path,
mapping=self.mapping,
**self.xarray_kwargs,
# preprocess=self.preprocess
),
**self.postprocess_chunk
).resample('maximum', frequency='1H')
else:
self._kube = open_dataset(
path=self.path,
pattern=self.pattern,
id_pattern=self.field_id,
metadata_caching=self.metadata_caching,
metadata_cache_path=self.metadata_cache_path,
mapping=self.mapping,
**self.xarray_kwargs,
# preprocess=self.preprocess
).apply(postprocess_afm,**self.postprocess_chunk).resample('maximum', frequency='1H')
return self._kube
1 change: 1 addition & 0 deletions drivers/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"intake.drivers": [
"geokube_netcdf = intake_geokube.netcdf:NetCDFSource",
"cmcc_wrf_geokube = intake_geokube.wrf:CMCCWRFSource",
"cmcc_afm_geokube = intake_geokube.afm:CMCCAFMSource",
]
},
classifiers=[
Expand Down
10 changes: 10 additions & 0 deletions executor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
ARG REGISTRY=rg.fr-par.scw.cloud/geolake
ARG TAG=latest
FROM $REGISTRY/geolake-datastore:$TAG

RUN apt update && apt install -y cron curl


WORKDIR /app
COPY requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir -r /code/requirements.txt
COPY app /app

COPY ./healtcheck.* /opt/

RUN chmod +x /opt/healtcheck.sh
RUN crontab -u root /opt/healtcheck.cron

CMD [ "python", "main.py" ]
36 changes: 23 additions & 13 deletions executor/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,26 +231,20 @@ def process(message: Message, compute: bool):
class Executor(metaclass=LoggableMeta):
_LOG = logging.getLogger("geokube.Executor")

def __init__(self, broker, store_path):
def __init__(self, broker, store_path, dask_cluster_opts):
self._store = store_path
broker_conn = pika.BlockingConnection(
pika.ConnectionParameters(host=broker, heartbeat=10),
)
self._conn = broker_conn
self._channel = broker_conn.channel()
self._db = DBManager()
self.dask_cluster_opts = dask_cluster_opts

def create_dask_cluster(self, dask_cluster_opts: dict = None):
if dask_cluster_opts is None:
dask_cluster_opts = {}
dask_cluster_opts["scheduler_port"] = int(
os.getenv("DASK_SCHEDULER_PORT", 8188)
)
dask_cluster_opts["processes"] = True
port = int(os.getenv("DASK_DASHBOARD_PORT", 8787))
dask_cluster_opts["dashboard_address"] = f":{port}"
dask_cluster_opts["n_workers"] = None
dask_cluster_opts["memory_limit"] = "auto"
dask_cluster_opts = self.dask_cluster_opts

self._worker_id = self._db.create_worker(
status="enabled",
dask_scheduler_port=dask_cluster_opts["scheduler_port"],
Expand All @@ -262,10 +256,11 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None):
extra={"track_id": self._worker_id},
)
dask_cluster = LocalCluster(
n_workers=dask_cluster_opts["n_workers"],
n_workers=dask_cluster_opts['n_workers'],
scheduler_port=dask_cluster_opts["scheduler_port"],
dashboard_address=dask_cluster_opts["dashboard_address"],
memory_limit=dask_cluster_opts["memory_limit"],
threads_per_worker=dask_cluster_opts['thread_per_worker'],
)
self._LOG.info(
"creating Dask Client...", extra={"track_id": self._worker_id}
Expand Down Expand Up @@ -356,7 +351,7 @@ def retry_until_timeout(
)
status = RequestStatus.FAILED
fail_reason = f"{type(e).__name__}: {str(e)}"
return (location_path, status, fail_reason)
return location_path, status, fail_reason

def handle_message(self, connection, channel, delivery_tag, body):
message: Message = Message(body)
Expand All @@ -382,6 +377,9 @@ def handle_message(self, connection, channel, delivery_tag, body):
message=message,
compute=False,
)

#future = asyncio.run(process(message,compute=False))

location_path, status, fail_reason = self.retry_until_timeout(
future,
message=message,
Expand Down Expand Up @@ -447,7 +445,19 @@ def get_size(self, location_path):
executor_types = os.getenv("EXECUTOR_TYPES", "query").split(",")
store_path = os.getenv("STORE_PATH", ".")

executor = Executor(broker=broker, store_path=store_path)
dask_cluster_opts = {}
dask_cluster_opts["scheduler_port"] = int(
os.getenv("DASK_SCHEDULER_PORT", 8188)
)
dask_cluster_opts["processes"] = True
port = int(os.getenv("DASK_DASHBOARD_PORT", 8787))
dask_cluster_opts["dashboard_address"] = f":{port}"
dask_cluster_opts["n_workers"] = int(os.getenv("DASK_N_WORKERS", 1))
dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto")
dask_cluster_opts['thread_per_worker'] = int(os.getenv("DASK_THREADS_PER_WORKER", 8))


executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts)
print("channel subscribe")
for etype in executor_types:
if etype == "query":
Expand Down
1 change: 1 addition & 0 deletions executor/healtcheck.cron
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*/10 * * * * bash -c '/opt/healtcheck.sh'
1 change: 1 addition & 0 deletions executor/healtcheck.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
curl https://hc-ping.com/$HEALTCHECKS

0 comments on commit a06f748

Please sign in to comment.