Skip to content

Commit

Permalink
Add support for date-based restart frequency
Browse files Browse the repository at this point in the history
- Move logic for deciding what restarts to prune to Experiment.prune_restarts()
- Extend access-om2 and mom driver to parse restart files for a cftime datetime
- Add functionality to calendar.py to parse date-based frequency values and add these intervals to different datetimes
  • Loading branch information
Jo Basevi committed Sep 15, 2023
1 parent 2d16f18 commit e3b2281
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 22 deletions.
111 changes: 110 additions & 1 deletion payu/calendar.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from dateutil.relativedelta import relativedelta
import datetime
import re

from dateutil.relativedelta import relativedelta
import cftime

NOLEAP, GREGORIAN = range(2)

Expand Down Expand Up @@ -86,3 +89,109 @@ def calculate_leapdays(init_date, final_date):
# TODO: Internal date correction (e.g. init_date is 1-March or later)

return datetime.timedelta(days=leap_days)


def add_year_offset_to_datetime(initial_dt, n):
"""Return a datetime n years from the initial datetime"""
if isinstance(initial_dt, datetime.datetime): # Standard datetime Calendar
return initial_dt + relativedelta(years=n)

if isinstance(initial_dt, cftime.datetime): # Non-standard Calendars
return initial_dt.replace(year=initial_dt.year + n)


def add_year_start_offset_to_datetime(initial_dt, n):
"""Return a datetime at the start of the year - n years from the initial datetime"""
if isinstance(initial_dt, datetime.datetime):
return initial_dt + relativedelta(years=n, month=1, day=1, hour=0, minute=0, second=0)

if isinstance(initial_dt, cftime.datetime):
return initial_dt.replace(year=initial_dt.year + n, month=1, day=1, hour=0, minute=0, second=0)


def add_month_start_offset_to_datetime(initial_dt, n):
"""Return a datetime of the start of the month - n months from the initial datetime"""
if isinstance(initial_dt, datetime.datetime):
return initial_dt + relativedelta(months=n, day=1, hour=0, minute=0, second=0)

if isinstance(initial_dt, cftime.datetime):
year = initial_dt.year + ((initial_dt.month + n - 1) // 12)
month = (initial_dt.month + n - 1) % 12 + 1

return initial_dt.replace(year=year, month=month, day=1, hour=0, minute=0, second=0)


def add_month_offset_to_datetime(initial_dt, n):
"""Return a datetime n months from the initial datetime"""
if isinstance(initial_dt, datetime.datetime):
return initial_dt + relativedelta(months=n)

if isinstance(initial_dt, cftime.datetime):
year = initial_dt.year + ((initial_dt.month + n - 1) // 12)
month = (initial_dt.month + n - 1) % 12 + 1
day = initial_dt.day

max_day_in_month = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}
if initial_dt.calendar == "noleap":
day = initial_dt.day if initial_dt.day <= max_day_in_month[month] else max_day_in_month[month]

if initial_dt.calendar == "all_leap":
max_day_in_month[2] = 29 # Every year is a leap year
day = initial_dt.day if initial_dt.day <= max_day_in_month[month] else max_day_in_month[month]

return initial_dt.replace(year=year, month=month, day=day)


def add_timedelta_fn(timedelta):
"""Returns a function that adds a timedelta - n times to an initial datetime"""
# Standard and cftime datetimes supports timedelta operations
return lambda initial_dt, n: initial_dt + n * timedelta


class DatetimeOffset:

def __init__(self, unit, magnitude):
# Dictionary of 'offset units' to functions which takes an initial_dt (Standard or cftime datetime)
# and n (multiplier of the offset unit), and returns the next datetime with the offset added
supported_datetime_offsets = {
'Y': add_year_offset_to_datetime,
'YS': add_year_start_offset_to_datetime,
'M': add_month_offset_to_datetime,
'MS': add_month_start_offset_to_datetime,
'W': add_timedelta_fn(datetime.timedelta(weeks=1)),
'D': add_timedelta_fn(datetime.timedelta(days=1)),
'H': add_timedelta_fn(datetime.timedelta(hours=1)),
'T': add_timedelta_fn(datetime.timedelta(minutes=1)),
'S': add_timedelta_fn(datetime.timedelta(seconds=1))
}
assert unit in supported_datetime_offsets, f"payu: error: unsupported datetime offset: {unit}"
self.unit = unit
self.magnitude = magnitude
self.add_offset_to_datetime = supported_datetime_offsets[unit]


def add_to_datetime(self, initial_dt):
"""Takes a datetime object (standard or cftime datetime),
and returns a datetime with the offset added if possible, returns None otherwise"""

if self.unit in ['M', 'Y'] and isinstance(initial_dt, cftime.datetime):
if initial_dt.datetime_compatible:
# Transform cftime datetime to standard datetime
initial_dt = datetime.datetime(initial_dt.year, initial_dt.month, initial_dt.day,
initial_dt.hour, initial_dt.minute, initial_dt.second)
elif initial_dt.calendar not in ["360_day", "noleap", "all_leap"]:
raise ValueError(f"Calendar type {initial_dt.calendar} is unsupported for given date offset {self.unit}")

if not (isinstance(initial_dt, cftime.datetime) or isinstance(initial_dt, datetime.datetime)):
raise TypeError(f"Invalid initial datetime type: {type(initial_dt)}. Expected types: cftime.datetime or datetime.datetime")

return self.add_offset_to_datetime(initial_dt=initial_dt, n=self.magnitude)


def parse_date_offset(offset):
"""Parse a given string date offset string, and returns an DatetimeOffset"""
match = re.search('[0-9]+', offset)
assert match is not None, f"payu: error: no numerical value given for offset: {offset}"
n = match.group()
unit = offset.lstrip(n)
return DatetimeOffset(unit=unit, magnitude=int(n))
77 changes: 56 additions & 21 deletions payu/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import payu.profilers
from payu.runlog import Runlog
from payu.manifest import Manifest
from payu.calendar import parse_date_offset

# Environment module support on vayu
# TODO: To be removed
Expand Down Expand Up @@ -742,28 +743,12 @@ def archive(self):

movetree(self.work_path, self.output_path)

# Remove old restart files
# TODO: Move to subroutine
restart_freq = self.config.get('restart_freq', default_restart_freq)
restart_history = self.config.get('restart_history',
default_restart_history)

# Remove any outdated restart files
prior_restart_dirs = [d for d in os.listdir(self.archive_path)
if d.startswith('restart')]

for res_dir in prior_restart_dirs:

res_idx = int(res_dir.lstrip('restart'))
if (self.repeat_run or
(not res_idx % restart_freq == 0 and
res_idx <= (self.counter - restart_history))):

res_path = os.path.join(self.archive_path, res_dir)

# Only delete real directories; ignore symbolic restart links
if (os.path.isdir(res_path) and not os.path.islink(res_path)):
shutil.rmtree(res_path)
restarts_to_prune = self.prune_restarts()
for restart_path in restarts_to_prune:
# Only delete real directories; ignore symbolic restart links
if (os.path.isdir(restart_path) and not os.path.islink(restart_path)):
shutil.rmtree(restart_path)

# Ensure dynamic library support for subsequent python calls
ld_libpaths = os.environ['LD_LIBRARY_PATH']
Expand Down Expand Up @@ -995,6 +980,56 @@ def sweep(self, hard_sweep=False):
if os.path.islink(self.work_sym_path):
print('Removing symlink {0}'.format(self.work_sym_path))
os.remove(self.work_sym_path)

def prune_restarts(self, from_n_restart=0, to_n_restart=None):
"""Returns a list of restart directories that can be pruned"""
restart_freq = self.config.get('restart_freq', 5)
restart_history = self.config.get('restart_history', default_restart_history)

# All restarts directories
restarts = [d for d in os.listdir(self.archive_path)
if d.startswith('restart')]
# Sort restarts based on counter - in increasing date order
restarts.sort(key=lambda d: int(d.lstrip('restart')))

# Note: from_n_restart and to_end_restart could be useful for inspecting only the more recent restarts
if to_n_restart is None:
# Keep restart_history n restarts
to_n_restart = -restart_history
restarts = restarts[from_n_restart:to_n_restart]

restarts_to_prune = []
if self.repeat_run:
# TODO: Previous logic was to prune all restarts if self.repeat_run - is that still the case?
restarts_to_prune = [os.path.join(self.archive_path, restart) for restart in restarts]
elif isinstance(restart_freq, int):
# Using integer frequency to prune restarts
for restart in restarts:
restart_idx = int(restart.lstrip('restart'))
if not restart_idx % restart_freq == 0:
restart_path = os.path.join(self.archive_path, restart)
restarts_to_prune.append(restart_path)
else:
# Using date-based frequency to prune restarts
try:
date_offset = parse_date_offset(restart_freq)

next_datetime = None
for restart in restarts:
restart_path = os.path.join(self.archive_path, restart)

# Use model-driver to parse restart files for a datetime
restart_datetime = self.model.get_restart_datetime(restart_path)

if next_datetime is not None and restart_datetime < next_datetime:
restarts_to_prune.append(restart_path)
else:
# Keep the earliest datetime and use last kept datetime as point of reference when adding the next time interval
next_datetime = date_offset.add_to_datetime(restart_datetime)
except Exception as e:
print("payu: error occured during date-based restart pruning:", e)

return restarts_to_prune


def enable_core_dump():
Expand Down
8 changes: 8 additions & 0 deletions payu/models/accessom2.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,11 @@ def archive(self):

def collate(self):
pass

def get_restart_datetime(self, restart_path):
for model in self.expt.models:
if model.model_type == 'mom':
mom_restart_path = os.path.join(restart_path, model.name)
return model.get_restart_datetime(mom_restart_path)

raise NotImplementedError('access-om2 date-based restart pruning currently only uses the mom sub-model to find restart dates')
5 changes: 5 additions & 0 deletions payu/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,8 @@ def profile(self):
if f.endswith('.cubex')][0]
cmd = 'scorep-score {0}'.format(cube_path)
sp.check_call(shlex.split(cmd))

def get_restart_datetime(self, restart_path):
"""Given a restart path, parse the restart files and return a cftime or standard datetime object to be used for
date-based restart pruning"""
raise NotImplementedError
24 changes: 24 additions & 0 deletions payu/models/mom.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import f90nml
import payu.envmod
import cftime

from payu.models.fms import Fms
from payu.fsops import mkdir_p, make_symlink

Expand Down Expand Up @@ -224,3 +226,25 @@ def create_mask_table(self, input_nml):
land_cells = int(fmask.readline())

return land_cells


def get_restart_datetime(self, restart_path):
"""Given a restart path, parse the restart files and return a cftime datetime (for date-based restart pruning)"""
ocean_solo_path = os.path.join(restart_path, 'ocean_solo.res')
with open(ocean_solo_path, 'r') as ocean_solo:
lines = ocean_solo.readlines()

calendar_int = int(lines[0].split()[0])
cftime_calendars = {
1: "360_day",
2: "julian",
3: "gregorian",
4: "noleap"
}
calendar = cftime_calendars[calendar_int]

last_date_line = lines[-1].split()
year, month, day, hour, minute, second = [int(i) for i in last_date_line[:6]]
return cftime.datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second,
calendar=calendar)

2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
'yamanifest',
'dateutil',
'tenacity',
'cftime'
],
install_requires=[
'f90nml >= 0.16',
Expand All @@ -44,6 +45,7 @@
'requests[security]',
'python-dateutil',
'tenacity!=7.0.0',
'cftime'
],
tests_require=[
'pytest',
Expand Down
1 change: 1 addition & 0 deletions test/requirements_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ mnctools
Sphinx
pytest-cov
numpy>=1.16.0
cftime
Loading

0 comments on commit e3b2281

Please sign in to comment.