Skip to content

Commit

Permalink
Add a watcher for DHuS instances
Browse files Browse the repository at this point in the history
  • Loading branch information
mraspaud committed Apr 29, 2024
1 parent b42fd4f commit c6876e4
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 8 deletions.
13 changes: 12 additions & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,21 @@ Minio bucket notification watcher
:members:

Copernicus dataspace watcher
---------------------------------
----------------------------
.. automodule:: pytroll_watchers.dataspace_watcher
:members:

EUMETSAT datastore watcher
--------------------------
.. automodule:: pytroll_watchers.datastore_watcher
:members:

DHuS watcher
------------
.. automodule:: pytroll_watchers.dhus_watcher
:members:


Testing utilities
-----------------
.. automodule:: pytroll_watchers.testing
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Utility functions and scripts to watch for new files on local or
authors = [
{ name = "Martin Raspaud", email = "[email protected]" }
]
dependencies = ["universal-pathlib", "trollsift", "pyyaml"]
dependencies = ["universal-pathlib", "trollsift", "pyyaml", "geojson"]
readme = "README.md"
requires-python = ">= 3.10"
license = {file = "LICENSE.txt"}
Expand All @@ -22,6 +22,7 @@ local = ["watchdog"]
publishing = ["posttroll"]
ssh = ["paramiko"]
dataspace = ["oauthlib", "requests_oauthlib", "s3fs"]
dhus = ["defusedxml"]

[build-system]
requires = ["hatchling", "hatch-vcs"]
Expand Down
150 changes: 150 additions & 0 deletions src/pytroll_watchers/dhus_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Watcher for DHuS instances.
For more information about DHuS, check out
https://sentineldatahub.github.io/DataHubSystem/about.html
"""

import datetime as dt
import logging
from contextlib import suppress
from urllib.parse import urljoin

import requests
from geojson import Polygon
from upath import UPath

from pytroll_watchers.dataspace_watcher import run_every
from pytroll_watchers.publisher import file_publisher_from_generator

logger = logging.getLogger(__name__)


def file_publisher(fs_config, publisher_config, message_config):
"""Publish files coming from local filesystem events.
Args:
fs_config: the configuration for the filesystem watching, will be passed as argument to `file_generator`.
publisher_config: The configuration dictionary to pass to the posttroll publishing functions.
message_config: The information needed to complete the posttroll message generation. Will be amended
with the file metadata, and passed directly to posttroll's Message constructor.
"""
logger.info(f"Starting watch on dhus for '{fs_config['filter_params']}'")
generator = file_generator(**fs_config)
return file_publisher_from_generator(generator, publisher_config, message_config)


def file_generator(server, filter_params, polling_interval, start_from=None):
"""Generate new objects by polling a DHuS instance.
Args:
server: the DHuS server to use.
filter_params: the list of filter parameters to use for narrowing the data to poll. For example, to poll IW sar
data, it can be `substringof('IW_GRDH',Name)`. For more information of the filter parameters, check:
https://scihub.copernicus.eu/twiki/do/view/SciHubUserGuide/ODataAPI#filter
polling_interval: the interval (timedelta object or kwargs to timedelta) at which the DHUS will be polled.
start_from: how far back in time to fetch the data the first time. This is helpful for the first iteration of
the generator, so that data from the past can be fetched, to fill a possible gap. Default to 0, meaning
nothing older than when the generator starts will be fetched. Same format accepted as polling_interval.
Yields:
Tuples of UPath (http) and metadata.
Note:
As this watcher uses requests, the authentication information should be stored in a .netrc file.
"""
with suppress(TypeError):
polling_interval = dt.timedelta(**polling_interval)
with suppress(TypeError):
start_from = dt.timedelta(**start_from)
if start_from is None:
start_from = dt.timedelta(0)

last_pub_date = dt.datetime.now(dt.timezone.utc) - start_from
for next_check in run_every(polling_interval):
generator = generate_download_links_since(server, filter_params, last_pub_date)
for path, metadata in generator:
last_pub_date = update_last_publication_date(last_pub_date, metadata)
yield path, metadata
logger.info("Finished polling.")
if next_check > dt.datetime.now(dt.timezone.utc):
logger.info(f"Next iteration at {next_check}")


def update_last_publication_date(last_publication_date, metadata):
"""Update the last publication data based on the metadata."""
publication_date = metadata.pop("ingestion_date")
if publication_date > last_publication_date:
last_publication_date = publication_date
return last_publication_date


def generate_download_links_since(server, filter_params, last_publication_date):
"""Generate download links for the data published since `last_publication_date`."""
last_publication_date = last_publication_date.astimezone(dt.timezone.utc)
# remove timezone info as dhus considers all times utc
pub_string = last_publication_date.isoformat(timespec="milliseconds")[:-6]
filter_params = filter_params + [f"IngestionDate gt datetime'{pub_string}'"]
yield from generate_download_links(server, filter_params)


def generate_download_links(server, filter_params):
"""Generate download links.
The filter params we can use are defined here: https://scihub.copernicus.eu/twiki/do/view/SciHubUserGuide/ODataAPI#filter
"""
filter_string = " and ".join(filter_params)

url = urljoin(server, f"/odata/v1/Products?$format=json&$filter={filter_string}&$expand=Attributes")

response = requests.get(url, timeout=60)
response.raise_for_status()

entries = response.json()

for entry in entries["d"]["results"]:
mda = dict()
path = UPath(entry["__metadata"]["media_src"])
mda["boundary"] = extract_boundary(entry)
results_dict = construct_results_dict(entry)
mda["platform_name"] = results_dict["Satellite name"].capitalize() + results_dict["Satellite number"]
mda["sensor"] = results_dict["Instrument"]
mda["ingestion_date"] = dt.datetime.fromisoformat(results_dict["Ingestion Date"])
mda["product_type"] = results_dict["Product type"]
mda["start_time"] = dt.datetime.fromisoformat(results_dict["Sensing start"])
mda["end_time"] = dt.datetime.fromisoformat(results_dict["Sensing stop"])
mda["orbit_number"] = int(results_dict["Orbit number (start)"])

mda["checksum"] = dict(algorithm=entry["Checksum"]["Algorithm"], hash=entry["Checksum"]["Value"])
mda["size"] = int(entry["ContentLength"])
yield path, mda

def construct_results_dict(entry):
"""Construct a dict from then "results" item in entry."""
results = entry["Attributes"]["results"]
results_dict = {result["Id"]: result["Value"] for result in results}
return results_dict


def extract_boundary(entry):
"""Extract the boundary from the entry metadata."""
gml, nsmap = read_gml(entry["ContentGeometry"])
boundary_text = gml.find("gml:outerBoundaryIs/gml:LinearRing/gml:coordinates", namespaces=nsmap).text
boundary_list = (coords.split(",") for coords in boundary_text.strip().split(" "))
boundary = Polygon([(float(lon), float(lat)) for (lat, lon) in boundary_list])
return boundary

def read_gml(gml_string):
"""Read the gml string."""
from xml.etree import ElementTree

from defusedxml.ElementTree import DefusedXMLParser
parser = ElementTree.XMLPullParser(["start-ns", "end"], _parser=DefusedXMLParser())
parser.feed(gml_string)

nsmap = dict()

for event, elem in parser.read_events():
if event == "start-ns":
nsmap[elem[0]] = elem[1]
return elem, nsmap
13 changes: 7 additions & 6 deletions src/pytroll_watchers/main_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import yaml

from pytroll_watchers.dhus_watcher import file_publisher as dhus_publisher
from pytroll_watchers.local_watcher import file_generator as local_generator
from pytroll_watchers.local_watcher import file_publisher as local_publisher
from pytroll_watchers.minio_notification_watcher import file_generator as minio_generator
Expand All @@ -25,6 +26,8 @@ def get_publisher_for_backend(backend):
return minio_publisher
elif backend == "local":
return local_publisher
elif backend == "dhus":
return dhus_publisher
else:
raise ValueError(f"Unknown backend {backend}.")

Expand Down Expand Up @@ -54,12 +57,10 @@ def publish_from_config(config):
config: a dictionary containing the `backend` string (`local` or `minio`), and `fs_config`, `publisher_config`
and `message_config` dictionaries.
"""
if config["backend"] == "local":
return local_publisher(config["fs_config"], config["publisher_config"], config["message_config"])
elif config["backend"] == "minio":
return minio_publisher(config["fs_config"], config["publisher_config"], config["message_config"])
else:
raise ValueError(f"Unknown backend {config['backend']}")
backend = config["backend"]
publisher = get_publisher_for_backend(backend)
return publisher(config["fs_config"], config["publisher_config"], config["message_config"])



def cli(args=None):
Expand Down
1 change: 1 addition & 0 deletions src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
with closing(publisher):
for file_item, file_metadata in generator:
amended_message_config = deepcopy(message_config)
amended_message_config.setdefault("data", {})
amended_message_config["data"]["uri"] = file_item.as_uri()
amended_message_config["data"]["uid"] = file_item.name
with suppress(AttributeError):
Expand Down
Loading

0 comments on commit c6876e4

Please sign in to comment.