Skip to content

Commit

Permalink
Refactor to use fix_times everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
mraspaud committed Apr 12, 2024
1 parent 0e82d27 commit a37a639
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 34 deletions.
13 changes: 6 additions & 7 deletions src/pytroll_watchers/local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
from pathlib import Path
from urllib.parse import urlunparse

from trollsift import parse
from upath import UPath

from pytroll_watchers.backends.local import listen_to_local_events
from pytroll_watchers.publisher import file_publisher_from_generator
from pytroll_watchers.publisher import file_publisher_from_generator, fix_times, parse_metadata


def file_publisher(fs_config, publisher_config, message_config):
Expand Down Expand Up @@ -59,13 +58,13 @@ def file_generator(directory, observer_type="os", file_pattern=None, protocol=No
"""
file_metadata = {}

pattern = os.path.join(directory, file_pattern) if file_pattern is not None else None
with listen_to_local_events(directory, file_pattern, observer_type) as events:
for path in events:
if file_pattern is not None:
file_metadata = parse(os.path.join(directory, file_pattern), path)
else:
file_metadata = {}
try:
file_metadata = parse_metadata(pattern, path)
except ValueError:
continue
if protocol is not None:
uri = urlunparse((protocol, None, path, None, None, None))
yield UPath(uri, **storage_options), file_metadata
Expand Down
32 changes: 5 additions & 27 deletions src/pytroll_watchers/minio_notification_watcher.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
"""Publish messages based on Minio bucket notifications."""

import datetime

from trollsift import parse
from upath import UPath

from pytroll_watchers.publisher import file_publisher_from_generator
from pytroll_watchers.publisher import file_publisher_from_generator, fix_times, parse_metadata


def file_publisher(fs_config, publisher_config, message_config):
Expand Down Expand Up @@ -51,12 +48,10 @@ def file_generator(endpoint_url, bucket_name, file_pattern=None, storage_options
for item in record["Records"]:
new_bucket_name = item["s3"]["bucket"]["name"]
new_file_name = item["s3"]["object"]["key"]
if file_pattern is not None:
try:
file_metadata = parse(file_pattern, new_file_name)
fix_times(file_metadata)
except ValueError:
continue
try:
file_metadata = parse_metadata(file_pattern, new_file_name)
except ValueError:
continue

path = UPath(f"s3://{new_bucket_name}/{new_file_name}", **storage_options)
yield path, file_metadata
Expand All @@ -81,20 +76,3 @@ def _record_generator(endpoint_url, bucket_name, profile=None):
) as events:
for event in events:
yield event


def fix_times(info):
"""Fix times so that date and time components are combined."""
if "start_date" in info:
info["start_time"] = datetime.datetime.combine(info["start_date"].date(),
info["start_time"].time())
if "end_date" not in info:
info["end_date"] = info["start_date"]
del info["start_date"]
if "end_date" in info:
info["end_time"] = datetime.datetime.combine(info["end_date"].date(),
info["end_time"].time())
del info["end_date"]
if "end_time" in info:
while info["start_time"] > info["end_time"]:
info["end_time"] += datetime.timedelta(days=1)
28 changes: 28 additions & 0 deletions src/pytroll_watchers/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

from contextlib import closing, suppress
from copy import deepcopy
import datetime

from posttroll.message import Message
from posttroll.publisher import create_publisher_from_dict_config
from trollsift import parse


def file_publisher_from_generator(generator, publisher_config, message_config):
Expand All @@ -29,3 +31,29 @@ def file_publisher_from_generator(generator, publisher_config, message_config):
amended_message_config["data"].update(file_metadata)
msg = Message(**amended_message_config)
publisher.send(str(msg))


def fix_times(info):
"""Fix times so that date and time components are combined, and start time is before end time."""
if "start_date" in info:
info["start_time"] = datetime.datetime.combine(info["start_date"].date(),
info["start_time"].time())
if "end_date" not in info:
info["end_date"] = info["start_date"]
del info["start_date"]
if "end_date" in info:
info["end_time"] = datetime.datetime.combine(info["end_date"].date(),
info["end_time"].time())
del info["end_date"]
if "end_time" in info:
while info["start_time"] > info["end_time"]:
info["end_time"] += datetime.timedelta(days=1)

def parse_metadata(file_pattern, path):
"""Parse metadata from the filename."""
if file_pattern is not None:
file_metadata = parse(file_pattern, path)
fix_times(file_metadata)
else:
file_metadata = {}
return file_metadata

0 comments on commit a37a639

Please sign in to comment.